This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 1f80a96f Bump DataFusion version (#453)
1f80a96f is described below
commit 1f80a96f077f1c97fae386c89d478d5f67eaa82a
Author: Andy Grove <[email protected]>
AuthorDate: Thu Oct 27 22:43:01 2022 -0600
Bump DataFusion version (#453)
* bump DataFusion version
* fix
* Fix python
* Formatting
* Python compile
* Update rev
* Compile
* Compile
* Clippy
* Nullability
* fomat
Co-authored-by: Heres, Daniel <[email protected]>
---
ballista-cli/Cargo.toml | 4 +-
ballista/client/Cargo.toml | 6 +-
ballista/client/src/context.rs | 6 +-
ballista/core/Cargo.toml | 6 +-
ballista/core/proto/datafusion.proto | 280 +++++++++++----------
.../core/src/execution_plans/distributed_query.rs | 2 +-
ballista/core/src/serde/mod.rs | 35 ++-
.../core/src/serde/physical_plan/from_proto.rs | 5 +-
ballista/core/src/serde/physical_plan/mod.rs | 26 +-
ballista/core/src/utils.rs | 4 +-
ballista/executor/Cargo.toml | 8 +-
ballista/scheduler/Cargo.toml | 6 +-
ballista/scheduler/src/display.rs | 4 +-
ballista/scheduler/src/planner.rs | 2 +-
ballista/scheduler/src/scheduler_server/event.rs | 2 +-
ballista/scheduler/src/scheduler_server/mod.rs | 4 +-
ballista/scheduler/src/state/execution_graph.rs | 2 +-
.../scheduler/src/state/execution_graph_dot.rs | 4 +-
ballista/scheduler/src/state/mod.rs | 2 +-
benchmarks/Cargo.toml | 4 +-
benchmarks/src/bin/tpch.rs | 9 +-
examples/Cargo.toml | 2 +-
python/Cargo.toml | 2 +-
python/ballista/tests/test_catalog.py | 6 +-
python/src/dataframe.rs | 2 +-
python/src/dataset_exec.rs | 9 +-
python/src/expression.rs | 11 +-
python/src/functions.rs | 3 +-
python/src/pyarrow_filter_expression.rs | 6 +-
29 files changed, 253 insertions(+), 209 deletions(-)
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index e36d03c5..f0fbaad6 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.9.0",
features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "13.0.0"
-datafusion-cli = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index 8c030b99..33eb9a53 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -31,12 +31,12 @@ rust-version = "1.59"
ballista-core = { path = "../core", version = "0.9.0" }
ballista-executor = { path = "../executor", version = "0.9.0", optional = true
}
ballista-scheduler = { path = "../scheduler", version = "0.9.0", optional =
true }
-datafusion = "13.0.0"
-datafusion-proto = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
-sqlparser = "0.25"
+sqlparser = "0.26"
tempfile = "3"
tokio = "1.0"
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 797a527d..22c849c6 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -33,11 +33,9 @@ use datafusion_proto::protobuf::LogicalPlanNode;
use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
-use datafusion::datasource::TableProvider;
+use datafusion::datasource::{source_as_provider, TableProvider};
use datafusion::error::{DataFusionError, Result};
-use datafusion::logical_plan::{
- source_as_provider, CreateExternalTable, LogicalPlan, TableScan,
-};
+use datafusion::logical_expr::{CreateExternalTable, LogicalPlan, TableScan};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig,
SessionContext,
};
diff --git a/ballista/core/Cargo.toml b/ballista/core/Cargo.toml
index 955af54e..2d710da9 100644
--- a/ballista/core/Cargo.toml
+++ b/ballista/core/Cargo.toml
@@ -42,13 +42,13 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }
-arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "25.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false,
optional = true }
-datafusion-proto = "13.0.0"
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
futures = "0.3"
hashbrown = "0.12"
diff --git a/ballista/core/proto/datafusion.proto
b/ballista/core/proto/datafusion.proto
index 0a92d0d1..b7ae6360 100644
--- a/ballista/core/proto/datafusion.proto
+++ b/ballista/core/proto/datafusion.proto
@@ -69,6 +69,8 @@ message LogicalPlanNode {
SubqueryAliasNode subquery_alias = 21;
CreateViewNode create_view = 22;
DistinctNode distinct = 23;
+ ViewTableScanNode view_scan = 24;
+ CustomTableScanNode custom_scan = 25;
}
}
@@ -109,6 +111,23 @@ message ListingTableScanNode {
}
}
+message ViewTableScanNode {
+ string table_name = 1;
+ LogicalPlanNode input = 2;
+ datafusion.Schema schema = 3;
+ ProjectionColumns projection = 4;
+ string definition = 5;
+}
+
+// Logical Plan to Scan a CustomTableProvider registered at runtime
+message CustomTableScanNode {
+ string table_name = 1;
+ ProjectionColumns projection = 2;
+ datafusion.Schema schema = 3;
+ repeated datafusion.LogicalExprNode filters = 4;
+ bytes custom_table_data = 5;
+}
+
message ProjectionNode {
LogicalPlanNode input = 1;
repeated datafusion.LogicalExprNode expr = 2;
@@ -156,6 +175,7 @@ message CreateExternalTableNode {
bool if_not_exists = 7;
string delimiter = 8;
string definition = 9;
+ string file_compression_type = 10;
}
message CreateCatalogSchemaNode {
@@ -653,53 +673,53 @@ message Field {
}
message FixedSizeBinary{
- int32 length = 1;
+ int32 length = 1;
}
message Timestamp{
- TimeUnit time_unit = 1;
- string timezone = 2;
+ TimeUnit time_unit = 1;
+ string timezone = 2;
}
enum DateUnit{
- Day = 0;
- DateMillisecond = 1;
+ Day = 0;
+ DateMillisecond = 1;
}
enum TimeUnit{
- Second = 0;
- Millisecond = 1;
- Microsecond = 2;
- Nanosecond = 3;
+ Second = 0;
+ Millisecond = 1;
+ Microsecond = 2;
+ Nanosecond = 3;
}
enum IntervalUnit{
- YearMonth = 0;
- DayTime = 1;
- MonthDayNano = 2;
+ YearMonth = 0;
+ DayTime = 1;
+ MonthDayNano = 2;
}
message Decimal{
- uint64 whole = 1;
- uint64 fractional = 2;
+ uint64 whole = 1;
+ uint64 fractional = 2;
}
message List{
- Field field_type = 1;
+ Field field_type = 1;
}
message FixedSizeList{
- Field field_type = 1;
- int32 list_size = 2;
+ Field field_type = 1;
+ int32 list_size = 2;
}
message Dictionary{
- ArrowType key = 1;
- ArrowType value = 2;
+ ArrowType key = 1;
+ ArrowType value = 2;
}
message Struct{
- repeated Field sub_field_types = 1;
+ repeated Field sub_field_types = 1;
}
enum UnionMode{
@@ -708,14 +728,17 @@ enum UnionMode{
}
message Union{
- repeated Field union_types = 1;
- UnionMode union_mode = 2;
- repeated int32 type_ids = 3;
+ repeated Field union_types = 1;
+ UnionMode union_mode = 2;
+ repeated int32 type_ids = 3;
}
message ScalarListValue{
- Field field = 1;
- repeated ScalarValue values = 2;
+ // encode null explicitly to distinguish a list with a null value
+ // from a list with no values)
+ bool is_null = 3;
+ Field field = 1;
+ repeated ScalarValue values = 2;
}
message ScalarTimestampValue {
@@ -748,40 +771,40 @@ message StructValue {
}
message ScalarValue{
- oneof value {
- // Null value of any type (type is encoded)
- PrimitiveScalarType null_value = 19;
-
- bool bool_value = 1;
- string utf8_value = 2;
- string large_utf8_value = 3;
- int32 int8_value = 4;
- int32 int16_value = 5;
- int32 int32_value = 6;
- int64 int64_value = 7;
- uint32 uint8_value = 8;
- uint32 uint16_value = 9;
- uint32 uint32_value = 10;
- uint64 uint64_value = 11;
- float float32_value = 12;
- double float64_value = 13;
- //Literal Date32 value always has a unit of day
- int32 date_32_value = 14;
- ScalarListValue list_value = 17;
- ScalarType null_list_value = 18;
-
- Decimal128 decimal128_value = 20;
- int64 date_64_value = 21;
- int32 interval_yearmonth_value = 24;
- int64 interval_daytime_value = 25;
- ScalarTimestampValue timestamp_value = 26;
- ScalarDictionaryValue dictionary_value = 27;
- bytes binary_value = 28;
- bytes large_binary_value = 29;
- int64 time64_value = 30;
- IntervalMonthDayNanoValue interval_month_day_nano = 31;
- StructValue struct_value = 32;
- }
+ oneof value {
+ // Null value of any type (type is encoded)
+ PrimitiveScalarType null_value = 19;
+
+ bool bool_value = 1;
+ string utf8_value = 2;
+ string large_utf8_value = 3;
+ int32 int8_value = 4;
+ int32 int16_value = 5;
+ int32 int32_value = 6;
+ int64 int64_value = 7;
+ uint32 uint8_value = 8;
+ uint32 uint16_value = 9;
+ uint32 uint32_value = 10;
+ uint64 uint64_value = 11;
+ float float32_value = 12;
+ double float64_value = 13;
+ //Literal Date32 value always has a unit of day
+ int32 date_32_value = 14;
+ ScalarListValue list_value = 17;
+ //WAS: ScalarType null_list_value = 18;
+
+ Decimal128 decimal128_value = 20;
+ int64 date_64_value = 21;
+ int32 interval_yearmonth_value = 24;
+ int64 interval_daytime_value = 25;
+ ScalarTimestampValue timestamp_value = 26;
+ ScalarDictionaryValue dictionary_value = 27;
+ bytes binary_value = 28;
+ bytes large_binary_value = 29;
+ int64 time64_value = 30;
+ IntervalMonthDayNanoValue interval_month_day_nano = 31;
+ StructValue struct_value = 32;
+ }
}
message Decimal128{
@@ -794,88 +817,77 @@ message Decimal128{
// List
enum PrimitiveScalarType{
- BOOL = 0; // arrow::Type::BOOL
- UINT8 = 1; // arrow::Type::UINT8
- INT8 = 2; // arrow::Type::INT8
- UINT16 = 3; // represents arrow::Type fields in src/arrow/type.h
- INT16 = 4;
- UINT32 = 5;
- INT32 = 6;
- UINT64 = 7;
- INT64 = 8;
- FLOAT32 = 9;
- FLOAT64 = 10;
- UTF8 = 11;
- LARGE_UTF8 = 12;
- DATE32 = 13;
- TIMESTAMP_MICROSECOND = 14;
- TIMESTAMP_NANOSECOND = 15;
- NULL = 16;
- DECIMAL128 = 17;
- DATE64 = 20;
- TIMESTAMP_SECOND = 21;
- TIMESTAMP_MILLISECOND = 22;
- INTERVAL_YEARMONTH = 23;
- INTERVAL_DAYTIME = 24;
- INTERVAL_MONTHDAYNANO = 28;
-
- BINARY = 25;
- LARGE_BINARY = 26;
-
- TIME64 = 27;
-}
-
-message ScalarType{
- oneof datatype{
- PrimitiveScalarType scalar = 1;
- ScalarListType list = 2;
- }
-}
-
-message ScalarListType{
- repeated string field_names = 3;
- PrimitiveScalarType deepest_type = 2;
+ BOOL = 0; // arrow::Type::BOOL
+ UINT8 = 1; // arrow::Type::UINT8
+ INT8 = 2; // arrow::Type::INT8
+ UINT16 = 3; // represents arrow::Type fields in src/arrow/type.h
+ INT16 = 4;
+ UINT32 = 5;
+ INT32 = 6;
+ UINT64 = 7;
+ INT64 = 8;
+ FLOAT32 = 9;
+ FLOAT64 = 10;
+ UTF8 = 11;
+ LARGE_UTF8 = 12;
+ DATE32 = 13;
+ TIMESTAMP_MICROSECOND = 14;
+ TIMESTAMP_NANOSECOND = 15;
+ NULL = 16;
+ DECIMAL128 = 17;
+ DATE64 = 20;
+ TIMESTAMP_SECOND = 21;
+ TIMESTAMP_MILLISECOND = 22;
+ INTERVAL_YEARMONTH = 23;
+ INTERVAL_DAYTIME = 24;
+ INTERVAL_MONTHDAYNANO = 28;
+
+ BINARY = 25;
+ LARGE_BINARY = 26;
+
+ TIME64 = 27;
}
+
// Broke out into multiple message types so that type
// metadata did not need to be in separate message
// All types that are of the empty message types contain no additional metadata
// about the type
message ArrowType{
- oneof arrow_type_enum{
- EmptyMessage NONE = 1; // arrow::Type::NA
- EmptyMessage BOOL = 2; // arrow::Type::BOOL
- EmptyMessage UINT8 = 3; // arrow::Type::UINT8
- EmptyMessage INT8 = 4; // arrow::Type::INT8
- EmptyMessage UINT16 =5; // represents arrow::Type fields in
src/arrow/type.h
- EmptyMessage INT16 = 6;
- EmptyMessage UINT32 =7;
- EmptyMessage INT32 = 8;
- EmptyMessage UINT64 =9;
- EmptyMessage INT64 =10 ;
- EmptyMessage FLOAT16 =11 ;
- EmptyMessage FLOAT32 =12 ;
- EmptyMessage FLOAT64 =13 ;
- EmptyMessage UTF8 =14 ;
- EmptyMessage LARGE_UTF8 = 32;
- EmptyMessage BINARY =15 ;
- int32 FIXED_SIZE_BINARY =16 ;
- EmptyMessage LARGE_BINARY = 31;
- EmptyMessage DATE32 =17 ;
- EmptyMessage DATE64 =18 ;
- TimeUnit DURATION = 19;
- Timestamp TIMESTAMP =20 ;
- TimeUnit TIME32 =21 ;
- TimeUnit TIME64 =22 ;
- IntervalUnit INTERVAL =23 ;
- Decimal DECIMAL =24 ;
- List LIST =25;
- List LARGE_LIST = 26;
- FixedSizeList FIXED_SIZE_LIST = 27;
- Struct STRUCT =28;
- Union UNION =29;
- Dictionary DICTIONARY =30;
- }
+ oneof arrow_type_enum{
+ EmptyMessage NONE = 1; // arrow::Type::NA
+ EmptyMessage BOOL = 2; // arrow::Type::BOOL
+ EmptyMessage UINT8 = 3; // arrow::Type::UINT8
+ EmptyMessage INT8 = 4; // arrow::Type::INT8
+ EmptyMessage UINT16 =5; // represents arrow::Type fields in
src/arrow/type.h
+ EmptyMessage INT16 = 6;
+ EmptyMessage UINT32 =7;
+ EmptyMessage INT32 = 8;
+ EmptyMessage UINT64 =9;
+ EmptyMessage INT64 =10 ;
+ EmptyMessage FLOAT16 =11 ;
+ EmptyMessage FLOAT32 =12 ;
+ EmptyMessage FLOAT64 =13 ;
+ EmptyMessage UTF8 =14 ;
+ EmptyMessage LARGE_UTF8 = 32;
+ EmptyMessage BINARY =15 ;
+ int32 FIXED_SIZE_BINARY =16 ;
+ EmptyMessage LARGE_BINARY = 31;
+ EmptyMessage DATE32 =17 ;
+ EmptyMessage DATE64 =18 ;
+ TimeUnit DURATION = 19;
+ Timestamp TIMESTAMP =20 ;
+ TimeUnit TIME32 =21 ;
+ TimeUnit TIME64 =22 ;
+ IntervalUnit INTERVAL =23 ;
+ Decimal DECIMAL =24 ;
+ List LIST =25;
+ List LARGE_LIST = 26;
+ FixedSizeList FIXED_SIZE_LIST = 27;
+ Struct STRUCT =28;
+ Union UNION =29;
+ Dictionary DICTIONARY =30;
+ }
}
//Useful for representing an empty enum variant in rust
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index bf5dc0cf..cd56cd36 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -29,7 +29,7 @@ use datafusion::arrow::error::{ArrowError, Result as
ArrowResult};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index 3576d727..04043822 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -21,8 +21,8 @@
use crate::{error::BallistaError, serde::scheduler::Action as BallistaAction};
use arrow_flight::sql::ProstMessageExt;
use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::FunctionRegistry;
-use datafusion::physical_plan::join_utils::JoinSide;
+use datafusion::execution::FunctionRegistry;
+use datafusion::physical_plan::joins::utils::JoinSide;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_proto::logical_plan::{
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
@@ -245,12 +245,15 @@ fn str_to_byte(s: &str) -> Result<u8, BallistaError> {
mod tests {
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
+ use datafusion::common::DFSchemaRef;
use datafusion::error::DataFusionError;
- use datafusion::execution::context::{QueryPlanner, SessionState,
TaskContext};
- use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
- use datafusion::logical_plan::plan::Extension;
- use datafusion::logical_plan::{
- col, DFSchemaRef, Expr, FunctionRegistry, LogicalPlan,
UserDefinedLogicalNode,
+ use datafusion::execution::{
+ context::{QueryPlanner, SessionState, TaskContext},
+ runtime_env::{RuntimeConfig, RuntimeEnv},
+ FunctionRegistry,
+ };
+ use datafusion::logical_expr::{
+ col, Expr, Extension, LogicalPlan, UserDefinedLogicalNode,
};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::planner::{DefaultPhysicalPlanner,
ExtensionPlanner};
@@ -548,6 +551,24 @@ mod tests {
Err(DataFusionError::Plan("unsupported plan type".to_string()))
}
}
+
+ fn try_decode_table_provider(
+ &self,
+ _buf: &[u8],
+ _schema: SchemaRef,
+ _ctx: &SessionContext,
+ ) -> Result<Arc<dyn datafusion::datasource::TableProvider>,
DataFusionError>
+ {
+ unimplemented!()
+ }
+
+ fn try_encode_table_provider(
+ &self,
+ _node: Arc<dyn datafusion::datasource::TableProvider>,
+ _buf: &mut Vec<u8>,
+ ) -> Result<(), DataFusionError> {
+ unimplemented!()
+ }
}
impl PhysicalExtensionCodec for TopKExtensionCodec {
diff --git a/ballista/core/src/serde/physical_plan/from_proto.rs
b/ballista/core/src/serde/physical_plan/from_proto.rs
index 1e451adb..693037bd 100644
--- a/ballista/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/core/src/serde/physical_plan/from_proto.rs
@@ -23,11 +23,12 @@ use std::sync::Arc;
use chrono::{TimeZone, Utc};
use datafusion::arrow::datatypes::Schema;
+use datafusion::config::ConfigOptions;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
+use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::window_function::WindowFunction;
-use datafusion::logical_plan::FunctionRegistry;
use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::file_format::FileScanConfig;
@@ -42,6 +43,7 @@ use datafusion::physical_plan::{ColumnStatistics,
PhysicalExpr, Statistics};
use datafusion_proto::from_proto::from_proto_binary_op;
use object_store::path::Path;
use object_store::ObjectMeta;
+use parking_lot::RwLock;
use crate::serde::protobuf::physical_expr_node::ExprType;
@@ -397,6 +399,7 @@ impl TryInto<FileScanConfig> for
&protobuf::FileScanExecConf {
let statistics = convert_required!(self.statistics)?;
Ok(FileScanConfig {
+ config_options: Arc::new(RwLock::new(ConfigOptions::new())), //
TODO add serde
object_store_url: ObjectStoreUrl::parse(&self.object_store_url)?,
file_schema: schema,
file_groups: self
diff --git a/ballista/core/src/serde/physical_plan/mod.rs
b/ballista/core/src/serde/physical_plan/mod.rs
index d081422a..610fe5fa 100644
--- a/ballista/core/src/serde/physical_plan/mod.rs
+++ b/ballista/core/src/serde/physical_plan/mod.rs
@@ -18,21 +18,23 @@
use std::convert::TryInto;
use std::sync::Arc;
+use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use prost::bytes::BufMut;
use prost::Message;
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::config::ConfigOptions;
+use datafusion::datasource::file_format::file_type::FileCompressionType;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_plan::window_frames::WindowFrame;
-use datafusion::logical_plan::FunctionRegistry;
+use datafusion::execution::FunctionRegistry;
+use datafusion::logical_expr::WindowFrame;
use datafusion::physical_plan::aggregates::{create_aggregate_expr,
AggregateMode};
use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::cross_join::CrossJoinExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::expressions::{Column, PhysicalSortExpr};
@@ -40,8 +42,8 @@ use datafusion::physical_plan::file_format::{
AvroExec, CsvExec, FileScanConfig, ParquetExec,
};
use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode};
-use datafusion::physical_plan::join_utils::{ColumnIndex, JoinFilter};
+use datafusion::physical_plan::joins::CrossJoinExec;
+use datafusion::physical_plan::joins::{HashJoinExec, PartitionMode};
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
@@ -53,6 +55,7 @@ use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, WindowExpr,
};
use datafusion_proto::from_proto::parse_expr;
+use parking_lot::RwLock;
use crate::error::BallistaError;
use crate::execution_plans::{
@@ -159,6 +162,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
decode_scan_config(scan.base_conf.as_ref().unwrap())?,
scan.has_header,
str_to_byte(&scan.delimiter)?,
+ FileCompressionType::UNCOMPRESSED,
))),
PhysicalPlanType::ParquetScan(scan) => {
let predicate = scan
@@ -311,7 +315,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
&[window_node_expr],
&[],
&[],
- Some(WindowFrame::default()),
+ Some(Arc::new(WindowFrame::default())),
&physical_schema,
)?)
}
@@ -1259,6 +1263,7 @@ fn decode_scan_config(
};
Ok(FileScanConfig {
+ config_options: Arc::new(RwLock::new(ConfigOptions::new())), // TODO
add serde
object_store_url,
file_schema: schema,
file_groups,
@@ -1289,10 +1294,11 @@ mod roundtrip_tests {
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::IntervalUnit;
+ use datafusion::config::ConfigOptions;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
+ use datafusion::logical_expr::create_udf;
use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
- use datafusion::logical_plan::create_udf;
use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::aggregates::PhysicalGroupBy;
@@ -1305,7 +1311,7 @@ mod roundtrip_tests {
datatypes::{DataType, Field, Schema},
},
datasource::listing::PartitionedFile,
- logical_plan::{JoinType, Operator},
+ logical_expr::{JoinType, Operator},
physical_plan::{
aggregates::{AggregateExec, AggregateMode},
empty::EmptyExec,
@@ -1313,7 +1319,7 @@ mod roundtrip_tests {
expressions::{Avg, Column, DistinctCount, PhysicalSortExpr},
file_format::{FileScanConfig, ParquetExec},
filter::FilterExec,
- hash_join::{HashJoinExec, PartitionMode},
+ joins::{HashJoinExec, PartitionMode},
limit::{GlobalLimitExec, LocalLimitExec},
sorts::sort::SortExec,
AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr,
Statistics,
@@ -1326,6 +1332,7 @@ mod roundtrip_tests {
use crate::serde::protobuf::PhysicalPlanNode;
use crate::serde::{AsExecutionPlan, BallistaCodec};
use datafusion_proto::protobuf::LogicalPlanNode;
+ use parking_lot::RwLock;
use super::super::super::error::Result;
use super::super::protobuf;
@@ -1569,6 +1576,7 @@ mod roundtrip_tests {
#[test]
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
let scan_config = FileScanConfig {
+ config_options: Arc::new(RwLock::new(ConfigOptions::new())), //
TODO add serde
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: Arc::new(Schema::new(vec![Field::new(
"col",
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 33047992..436a0c45 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -30,7 +30,7 @@ use datafusion::execution::context::{
QueryPlanner, SessionConfig, SessionContext, SessionState,
};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -38,7 +38,7 @@ use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_join::HashJoinExec;
+use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml
index bf09d2be..6db40c6f 100644
--- a/ballista/executor/Cargo.toml
+++ b/ballista/executor/Cargo.toml
@@ -35,15 +35,15 @@ default = ["mimalloc"]
[dependencies]
anyhow = "1"
-arrow = { version = "24.0.0" }
-arrow-flight = { version = "24.0.0" }
+arrow = { version = "25.0.0" }
+arrow-flight = { version = "25.0.0" }
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.9.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = "13.0.0"
-datafusion-proto = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
futures = "0.3"
hyper = "0.14.4"
log = "0.4"
diff --git a/ballista/scheduler/Cargo.toml b/ballista/scheduler/Cargo.toml
index ea996c31..82bc79d0 100644
--- a/ballista/scheduler/Cargo.toml
+++ b/ballista/scheduler/Cargo.toml
@@ -38,7 +38,7 @@ sled = ["sled_package", "tokio-stream"]
[dependencies]
anyhow = "1"
-arrow-flight = { version = "24.0.0", features = ["flight-sql-experimental"] }
+arrow-flight = { version = "25.0.0", features = ["flight-sql-experimental"] }
async-recursion = "1.0.0"
async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.9.0" }
@@ -46,8 +46,8 @@ base64 = { version = "0.13", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
dashmap = "5.4.0"
-datafusion = "13.0.0"
-datafusion-proto = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
etcd-client = { version = "0.10", optional = true }
flatbuffers = { version = "22.9.29" }
futures = "0.3"
diff --git a/ballista/scheduler/src/display.rs
b/ballista/scheduler/src/display.rs
index 23753889..7df23c22 100644
--- a/ballista/scheduler/src/display.rs
+++ b/ballista/scheduler/src/display.rs
@@ -20,7 +20,7 @@
//! format
use ballista_core::utils::collect_plan_metrics;
-use datafusion::logical_plan::{StringifiedPlan, ToStringifiedPlan};
+use datafusion::logical_expr::{StringifiedPlan, ToStringifiedPlan};
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::{
accept, DisplayFormatType, ExecutionPlan, ExecutionPlanVisitor,
@@ -153,7 +153,7 @@ impl<'a, 'b> ExecutionPlanVisitor for IndentVisitor<'a, 'b>
{
impl<'a> ToStringifiedPlan for DisplayableBallistaExecutionPlan<'a> {
fn to_stringified(
&self,
- plan_type: datafusion::logical_plan::PlanType,
+ plan_type: datafusion::logical_expr::PlanType,
) -> StringifiedPlan {
StringifiedPlan::new(plan_type, self.indent().to_string())
}
diff --git a/ballista/scheduler/src/planner.rs
b/ballista/scheduler/src/planner.rs
index c91edd34..4265d5bd 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -326,7 +326,7 @@ mod test {
use ballista_core::serde::{protobuf, AsExecutionPlan, BallistaCodec};
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
- use datafusion::physical_plan::hash_join::HashJoinExec;
+ use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{
coalesce_partitions::CoalescePartitionsExec,
projection::ProjectionExec,
diff --git a/ballista/scheduler/src/scheduler_server/event.rs
b/ballista/scheduler/src/scheduler_server/event.rs
index f206594f..544976cd 100644
--- a/ballista/scheduler/src/scheduler_server/event.rs
+++ b/ballista/scheduler/src/scheduler_server/event.rs
@@ -17,7 +17,7 @@
use crate::state::executor_manager::ExecutorReservation;
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
use crate::state::execution_graph::RunningTaskInfo;
use ballista_core::serde::protobuf::TaskStatus;
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index 44fed7db..661725af 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -26,7 +26,7 @@ use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use ballista_core::utils::default_session_builder;
use datafusion::execution::context::SessionState;
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_proto::logical_plan::AsLogicalPlan;
@@ -303,7 +303,7 @@ mod test {
use std::time::Duration;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
- use datafusion::logical_plan::{col, sum, LogicalPlan};
+ use datafusion::logical_expr::{col, sum, LogicalPlan};
use datafusion::test_util::scan_empty;
use datafusion_proto::protobuf::LogicalPlanNode;
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index 5702bed1..dcb919b1 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1581,8 +1581,8 @@ mod test {
use std::sync::Arc;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
+ use datafusion::logical_expr::JoinType;
use datafusion::logical_expr::{col, count, sum, Expr};
- use datafusion::logical_plan::JoinType;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::scan_empty;
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs
b/ballista/scheduler/src/state/execution_graph_dot.rs
index d46e93f8..0cdbb39c 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -25,12 +25,12 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::cross_join::CrossJoinExec;
use datafusion::physical_plan::file_format::{
AvroExec, CsvExec, FileScanConfig, NdJsonExec, ParquetExec,
};
use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::hash_join::HashJoinExec;
+use datafusion::physical_plan::joins::CrossJoinExec;
+use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::projection::ProjectionExec;
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index cb723c9a..de20dcdd 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -36,7 +36,7 @@ use crate::state::execution_graph::TaskDescription;
use ballista_core::error::{BallistaError, Result};
use ballista_core::serde::protobuf::TaskStatus;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 9c66c1bc..135a334c 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/client", version = "0.9.0" }
-datafusion = "13.0.0"
-datafusion-proto = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 68d350be..c262c3b6 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -28,7 +28,7 @@ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
-use datafusion::logical_plan::LogicalPlan;
+use datafusion::logical_expr::LogicalPlan;
use datafusion::parquet::basic::Compression;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
@@ -980,8 +980,7 @@ mod tests {
use super::*;
use datafusion::arrow::array::*;
use datafusion::arrow::util::display::array_value_to_string;
- use datafusion::logical_plan::Expr;
- use datafusion::logical_plan::Expr::Cast;
+ use datafusion::logical_expr::{expr::Cast, Expr};
use std::env;
use std::sync::Arc;
@@ -1469,10 +1468,10 @@ mod tests {
.iter()
.map(|field| {
Expr::Alias(
- Box::new(Cast {
+ Box::new(Expr::Cast(Cast {
expr: Box::new(trim(col(Field::name(field)))),
data_type: Field::data_type(field).to_owned(),
- }),
+ })),
Field::name(field).to_string(),
)
})
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 2a1c6871..0f551528 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/client", version = "0.9.0" }
-datafusion = "13.0.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.11"
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 1136f762..2c0d67c8 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ default = ["mimalloc"]
[dependencies]
async-trait = "0.1"
ballista = { path = "../ballista/client", version = "0.9.0" }
-datafusion = { version = "13.0.0", features = ["pyarrow"] }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fa5cd7f30d785c5b3355e425e082a9d5a91bf567", features = ["pyarrow"] }
futures = "0.3"
mimalloc = { version = "*", optional = true, default-features = false }
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3",
"abi3-py37"] }
diff --git a/python/ballista/tests/test_catalog.py
b/python/ballista/tests/test_catalog.py
index e53dd7ac..d9043856 100644
--- a/python/ballista/tests/test_catalog.py
+++ b/python/ballista/tests/test_catalog.py
@@ -65,8 +65,8 @@ def test_basic(ctx, database):
assert table.kind == "physical"
assert table.schema == pa.schema(
[
- pa.field("int", pa.int64(), nullable=False),
- pa.field("str", pa.string(), nullable=False),
- pa.field("float", pa.float64(), nullable=False),
+ pa.field("int", pa.int64(), nullable=True),
+ pa.field("str", pa.string(), nullable=True),
+ pa.field("float", pa.float64(), nullable=True),
]
)
diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs
index bc1a9a68..3ee7680c 100644
--- a/python/src/dataframe.rs
+++ b/python/src/dataframe.rs
@@ -21,7 +21,7 @@ use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::pyarrow::{PyArrowConvert, PyArrowException,
PyArrowType};
use datafusion::arrow::util::pretty;
use datafusion::dataframe::DataFrame;
-use datafusion::logical_plan::JoinType;
+use datafusion::logical_expr::JoinType;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::PyTuple;
diff --git a/python/src/dataset_exec.rs b/python/src/dataset_exec.rs
index 987b7ad1..f238d54a 100644
--- a/python/src/dataset_exec.rs
+++ b/python/src/dataset_exec.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::optimizer::utils::conjunction;
/// Implements a Datafusion physical ExecutionPlan that delegates to a PyArrow
Dataset
/// This actually performs the projection, filtering and scanning of a Dataset
use pyo3::prelude::*;
@@ -32,7 +33,7 @@ use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError as InnerDataFusionError, Result as
DFResult};
use datafusion::execution::context::TaskContext;
-use datafusion::logical_plan::{combine_filters, Expr};
+use datafusion::logical_expr::Expr;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
@@ -93,9 +94,9 @@ impl DatasetExec {
.collect()
});
let columns: Option<Vec<String>> = columns.transpose()?;
- let filter_expr: Option<PyObject> = combine_filters(filters)
- .map(|filters| {
- PyArrowFilterExpression::try_from(&filters)
+ let filter_expr: Option<PyObject> =
conjunction(filters.iter().cloned())
+ .map(|filter| {
+ PyArrowFilterExpression::try_from(&filter)
.map(|filter_expr| filter_expr.inner().clone_ref(py))
})
.transpose()?;
diff --git a/python/src/expression.rs b/python/src/expression.rs
index aa6e540c..03c32fe4 100644
--- a/python/src/expression.rs
+++ b/python/src/expression.rs
@@ -15,12 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::logical_expr::expr::Cast;
use pyo3::{basic::CompareOp, prelude::*};
use std::convert::{From, Into};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowType;
-use datafusion::logical_plan::{col, lit, Expr};
+use datafusion::logical_expr::{col, lit, Expr, GetIndexedField};
use datafusion::scalar::ScalarValue;
/// An PyExpr that can be used on a DataFrame
@@ -93,10 +94,10 @@ impl PyExpr {
}
fn __getitem__(&self, key: &str) -> PyResult<PyExpr> {
- Ok(Expr::GetIndexedField {
+ Ok(Expr::GetIndexedField(GetIndexedField {
expr: Box::new(self.expr.clone()),
key: ScalarValue::Utf8(Some(key.to_string())),
- }
+ })
.into())
}
@@ -128,10 +129,10 @@ impl PyExpr {
pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
// self.expr.cast_to() requires DFSchema to validate that the cast
// is supported, omit that for now
- let expr = Expr::Cast {
+ let expr = Expr::Cast(Cast {
expr: Box::new(self.expr.clone()),
data_type: to.0,
- };
+ });
expr.into()
}
}
diff --git a/python/src/functions.rs b/python/src/functions.rs
index 2d0550d2..be6fff25 100644
--- a/python/src/functions.rs
+++ b/python/src/functions.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::prelude::lit;
use pyo3::{prelude::*, wrap_pyfunction};
use datafusion::logical_expr::{self, BuiltinScalarFunction, WindowFunction};
@@ -80,7 +81,7 @@ fn concat(args: Vec<PyExpr>) -> PyResult<PyExpr> {
#[pyfunction(sep, args = "*")]
fn concat_ws(sep: String, args: Vec<PyExpr>) -> PyResult<PyExpr> {
let args = args.into_iter().map(|e| e.expr).collect::<Vec<_>>();
- Ok(logical_expr::concat_ws(sep, &args).into())
+ Ok(logical_expr::concat_ws(lit(sep), args).into())
}
/// Creates a new Sort expression
diff --git a/python/src/pyarrow_filter_expression.rs
b/python/src/pyarrow_filter_expression.rs
index cbe2b801..99047c14 100644
--- a/python/src/pyarrow_filter_expression.rs
+++ b/python/src/pyarrow_filter_expression.rs
@@ -150,7 +150,7 @@ impl TryFrom<&Expr> for PyArrowFilterExpression {
v
))),
},
- Expr::BinaryExpr { left, op, right } => {
+ Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let operator = operator_to_py(op, op_module)?;
let left =
PyArrowFilterExpression::try_from(left.as_ref())?.0;
let right =
PyArrowFilterExpression::try_from(right.as_ref())?.0;
@@ -173,12 +173,12 @@ impl TryFrom<&Expr> for PyArrowFilterExpression {
.into_ref(py);
Ok(expr.call_method1("is_null", (expr,))?)
}
- Expr::Between {
+ Expr::Between(Between {
expr,
negated,
low,
high,
- } => {
+ }) => {
let expr =
PyArrowFilterExpression::try_from(expr.as_ref())?.0;
let low =
PyArrowFilterExpression::try_from(low.as_ref())?.0;
let high =
PyArrowFilterExpression::try_from(high.as_ref())?.0;