This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 167bbf9d Update DataFusion to 43 (#1125)
167bbf9d is described below
commit 167bbf9deab74ebf453675606f43c9e62894bf90
Author: Daniƫl Heres <[email protected]>
AuthorDate: Thu Dec 19 15:56:30 2024 +0100
Update DataFusion to 43 (#1125)
* Update DataFusions to 43
* Fmt
* Debug
* Remove comment
* Update proto
* Update common proto as well
---
Cargo.toml | 9 +-
ballista/client/tests/context_setup.rs | 2 +-
ballista/core/proto/datafusion.proto | 174 ++++++++++++-------------
ballista/core/proto/datafusion_common.proto | 9 +-
ballista/core/src/extension.rs | 2 +-
ballista/core/src/utils.rs | 15 ++-
ballista/executor/src/executor_process.rs | 2 +-
ballista/executor/src/standalone.rs | 2 +-
ballista/scheduler/src/test_utils.rs | 5 +-
benchmarks/src/bin/tpch.rs | 2 +-
docs/source/user-guide/extending-components.md | 2 +-
examples/src/object_store.rs | 2 +-
examples/tests/object_store.rs | 10 +-
13 files changed, 119 insertions(+), 117 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index f9206458..a9e9556f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -26,11 +26,10 @@ arrow-flight = { version = "53", features =
["flight-sql-experimental"] }
clap = { version = "4.5", features = ["derive", "cargo"] }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
-# bump directly to datafusion v43 to avoid the serde bug on v42
(https://github.com/apache/datafusion/pull/12626)
-datafusion = "42.0.0"
-datafusion-cli = "42.0.0"
-datafusion-proto = "42.0.0"
-datafusion-proto-common = "42.0.0"
+datafusion = "43.0.0"
+datafusion-cli = "43.0.0"
+datafusion-proto = "43.0.0"
+datafusion-proto-common = "43.0.0"
object_store = "0.11"
prost = "0.13"
prost-types = "0.13"
diff --git a/ballista/client/tests/context_setup.rs
b/ballista/client/tests/context_setup.rs
index 4dc6050d..7806d847 100644
--- a/ballista/client/tests/context_setup.rs
+++ b/ballista/client/tests/context_setup.rs
@@ -380,7 +380,7 @@ mod standalone {
}
}
- #[derive(Default)]
+ #[derive(Debug, Default)]
struct BadPlanner {}
#[async_trait::async_trait]
diff --git a/ballista/core/proto/datafusion.proto
b/ballista/core/proto/datafusion.proto
index cf166ba9..5a044cb4 100644
--- a/ballista/core/proto/datafusion.proto
+++ b/ballista/core/proto/datafusion.proto
@@ -75,10 +75,6 @@ message LogicalExprNodeCollection {
repeated LogicalExprNode logical_expr_nodes = 1;
}
-message SortExprNodeCollection {
- repeated SortExprNode sort_expr_nodes = 1;
-}
-
message ListingTableScanNode {
reserved 1; // was string table_name
TableReference table_name = 14;
@@ -94,9 +90,8 @@ message ListingTableScanNode {
datafusion_common.CsvFormat csv = 10;
datafusion_common.ParquetFormat parquet = 11;
datafusion_common.AvroFormat avro = 12;
- datafusion_common.NdJsonFormat json = 15;
}
- repeated SortExprNodeCollection file_sort_order = 13;
+ repeated LogicalExprNodeCollection file_sort_order = 13;
}
message ViewTableScanNode {
@@ -133,7 +128,7 @@ message SelectionNode {
message SortNode {
LogicalPlanNode input = 1;
- repeated SortExprNode expr = 2;
+ repeated LogicalExprNode expr = 2;
// Maximum number of highest/lowest rows to fetch; negative means no limit
int64 fetch = 3;
}
@@ -164,12 +159,12 @@ message CreateExternalTableNode {
repeated string table_partition_cols = 5;
bool if_not_exists = 6;
string definition = 7;
- repeated SortExprNodeCollection order_exprs = 10;
+ repeated LogicalExprNodeCollection order_exprs = 10;
bool unbounded = 11;
map<string, string> options = 8;
datafusion_common.Constraints constraints = 12;
map<string, LogicalExprNode> column_defaults = 13;
-}
+ }
message PrepareNode {
string name = 1;
@@ -249,51 +244,35 @@ message DistinctNode {
message DistinctOnNode {
repeated LogicalExprNode on_expr = 1;
repeated LogicalExprNode select_expr = 2;
- repeated SortExprNode sort_expr = 3;
+ repeated LogicalExprNode sort_expr = 3;
LogicalPlanNode input = 4;
}
message CopyToNode {
- LogicalPlanNode input = 1;
- string output_url = 2;
- bytes file_type = 3;
- repeated string partition_by = 7;
+ LogicalPlanNode input = 1;
+ string output_url = 2;
+ oneof format_options {
+ datafusion_common.CsvOptions csv = 8;
+ datafusion_common.JsonOptions json = 9;
+ datafusion_common.TableParquetOptions parquet = 10;
+ datafusion_common.AvroOptions avro = 11;
+ datafusion_common.ArrowOptions arrow = 12;
+ }
+ repeated string partition_by = 7;
}
message UnnestNode {
- LogicalPlanNode input = 1;
- repeated ColumnUnnestExec exec_columns = 2;
- repeated ColumnUnnestListItem list_type_columns = 3;
- repeated uint64 struct_type_columns = 4;
- repeated uint64 dependency_indices = 5;
- datafusion_common.DfSchema schema = 6;
- UnnestOptions options = 7;
-}
-message ColumnUnnestListItem {
- uint32 input_index = 1;
- ColumnUnnestListRecursion recursion = 2;
-}
-
-message ColumnUnnestListRecursions {
- repeated ColumnUnnestListRecursion recursions = 2;
-}
-
-message ColumnUnnestListRecursion {
- datafusion_common.Column output_column = 1;
- uint32 depth = 2;
-}
-
-message ColumnUnnestExec {
- datafusion_common.Column column = 1;
- oneof UnnestType {
- ColumnUnnestListRecursions list = 2;
- datafusion_common.EmptyMessage struct = 3;
- datafusion_common.EmptyMessage inferred = 4;
- }
+ LogicalPlanNode input = 1;
+ repeated datafusion_common.Column exec_columns = 2;
+ repeated uint64 list_type_columns = 3;
+ repeated uint64 struct_type_columns = 4;
+ repeated uint64 dependency_indices = 5;
+ datafusion_common.DfSchema schema = 6;
+ UnnestOptions options = 7;
}
message UnnestOptions {
- bool preserve_nulls = 1;
+ bool preserve_nulls = 1;
}
message UnionNode {
@@ -337,6 +316,8 @@ message LogicalExprNode {
// binary expressions
BinaryExprNode binary_expr = 4;
+ // aggregate expressions
+ AggregateExprNode aggregate_expr = 5;
// null checks
IsNull is_null_expr = 6;
@@ -346,6 +327,7 @@ message LogicalExprNode {
BetweenNode between = 9;
CaseNode case_ = 10;
CastNode cast = 11;
+ SortExprNode sort = 12;
NegativeNode negative = 13;
InListNode in_list = 14;
Wildcard wildcard = 15;
@@ -387,7 +369,7 @@ message LogicalExprNode {
}
message Wildcard {
- TableReference qualifier = 1;
+ string qualifier = 1;
}
message PlaceholderNode {
@@ -489,14 +471,57 @@ message InListNode {
bool negated = 3;
}
+enum AggregateFunction {
+ MIN = 0;
+ MAX = 1;
+ SUM = 2;
+ AVG = 3;
+ COUNT = 4;
+ APPROX_DISTINCT = 5;
+ ARRAY_AGG = 6;
+ // VARIANCE = 7;
+ VARIANCE_POP = 8;
+ // COVARIANCE = 9;
+ // COVARIANCE_POP = 10;
+ STDDEV = 11;
+ STDDEV_POP = 12;
+ CORRELATION = 13;
+ APPROX_PERCENTILE_CONT = 14;
+ APPROX_MEDIAN = 15;
+ APPROX_PERCENTILE_CONT_WITH_WEIGHT = 16;
+ GROUPING = 17;
+ // MEDIAN = 18;
+ BIT_AND = 19;
+ BIT_OR = 20;
+ BIT_XOR = 21;
+ BOOL_AND = 22;
+ BOOL_OR = 23;
+ REGR_SLOPE = 26;
+ REGR_INTERCEPT = 27;
+ REGR_COUNT = 28;
+ REGR_R2 = 29;
+ REGR_AVGX = 30;
+ REGR_AVGY = 31;
+ REGR_SXX = 32;
+ REGR_SYY = 33;
+ REGR_SXY = 34;
+ STRING_AGG = 35;
+ NTH_VALUE_AGG = 36;
+}
+
+message AggregateExprNode {
+ AggregateFunction aggr_function = 1;
+ repeated LogicalExprNode expr = 2;
+ bool distinct = 3;
+ LogicalExprNode filter = 4;
+ repeated LogicalExprNode order_by = 5;
+}
message AggregateUDFExprNode {
string fun_name = 1;
repeated LogicalExprNode args = 2;
- bool distinct = 5;
LogicalExprNode filter = 3;
- repeated SortExprNode order_by = 4;
- optional bytes fun_definition = 6;
+ repeated LogicalExprNode order_by = 4;
}
message ScalarUDFExprNode {
@@ -506,8 +531,7 @@ message ScalarUDFExprNode {
}
enum BuiltInWindowFunction {
- UNSPECIFIED = 0; //
https://protobuf.dev/programming-guides/dos-donts/#unspecified-enum
- // ROW_NUMBER = 0;
+ ROW_NUMBER = 0;
RANK = 1;
DENSE_RANK = 2;
PERCENT_RANK = 3;
@@ -522,16 +546,16 @@ enum BuiltInWindowFunction {
message WindowExprNode {
oneof window_function {
+ AggregateFunction aggr_function = 1;
BuiltInWindowFunction built_in_function = 2;
string udaf = 3;
string udwf = 9;
}
LogicalExprNode expr = 4;
repeated LogicalExprNode partition_by = 5;
- repeated SortExprNode order_by = 6;
+ repeated LogicalExprNode order_by = 6;
// repeated LogicalExprNode filter = 7;
WindowFrame window_frame = 8;
- optional bytes fun_definition = 10;
}
message BetweenNode {
@@ -650,11 +674,9 @@ message PlanType {
datafusion_common.EmptyMessage FinalLogicalPlan = 3;
datafusion_common.EmptyMessage InitialPhysicalPlan = 4;
datafusion_common.EmptyMessage InitialPhysicalPlanWithStats = 9;
- datafusion_common.EmptyMessage InitialPhysicalPlanWithSchema = 11;
OptimizedPhysicalPlanType OptimizedPhysicalPlan = 5;
datafusion_common.EmptyMessage FinalPhysicalPlan = 6;
datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10;
- datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12;
}
}
@@ -715,11 +737,10 @@ message PhysicalPlanNode {
AnalyzeExecNode analyze = 23;
JsonSinkExecNode json_sink = 24;
SymmetricHashJoinExecNode symmetric_hash_join = 25;
- InterleaveExecNode interleave = 26;
+ InterleaveExecNode interleave = 26;
PlaceholderRowExecNode placeholder_row = 27;
CsvSinkExecNode csv_sink = 28;
ParquetSinkExecNode parquet_sink = 29;
- UnnestExecNode unnest = 30;
}
}
@@ -731,21 +752,13 @@ message PartitionColumn {
message FileSinkConfig {
reserved 6; // writer_mode
- reserved 8; // was `overwrite` which has been superseded by `insert_op`
string object_store_url = 1;
repeated PartitionedFile file_groups = 2;
repeated string table_paths = 3;
datafusion_common.Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
- bool keep_partition_by_columns = 9;
- InsertOp insert_op = 10;
-}
-
-enum InsertOp {
- Append = 0;
- Overwrite = 1;
- Replace = 2;
+ bool overwrite = 8;
}
message JsonSink {
@@ -784,19 +797,6 @@ message ParquetSinkExecNode {
PhysicalSortExprNodeCollection sort_order = 4;
}
-message UnnestExecNode {
- PhysicalPlanNode input = 1;
- datafusion_common.Schema schema = 2;
- repeated ListUnnest list_type_columns = 3;
- repeated uint64 struct_type_columns = 4;
- UnnestOptions options = 5;
-}
-
-message ListUnnest {
- uint32 index_in_input_schema = 1;
- uint32 depth = 2;
-}
-
message PhysicalExtensionNode {
bytes node = 1;
repeated PhysicalPlanNode inputs = 2;
@@ -838,8 +838,6 @@ message PhysicalExprNode {
// was PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;
PhysicalLikeExprNode like_expr = 18;
-
- PhysicalExtensionExprNode extension = 19;
}
}
@@ -852,17 +850,17 @@ message PhysicalScalarUdfNode {
message PhysicalAggregateExprNode {
oneof AggregateFunction {
+ AggregateFunction aggr_function = 1;
string user_defined_aggr_function = 4;
}
repeated PhysicalExprNode expr = 2;
repeated PhysicalSortExprNode ordering_req = 5;
bool distinct = 3;
- bool ignore_nulls = 6;
- optional bytes fun_definition = 7;
}
message PhysicalWindowExprNode {
oneof window_function {
+ AggregateFunction aggr_function = 1;
BuiltInWindowFunction built_in_function = 2;
string user_defined_aggr_function = 3;
}
@@ -871,7 +869,6 @@ message PhysicalWindowExprNode {
repeated PhysicalSortExprNode order_by = 6;
WindowFrame window_frame = 7;
string name = 8;
- optional bytes fun_definition = 9;
}
message PhysicalIsNull {
@@ -947,16 +944,10 @@ message PhysicalNegativeNode {
PhysicalExprNode expr = 1;
}
-message PhysicalExtensionExprNode {
- bytes expr = 1;
- repeated PhysicalExprNode inputs = 2;
-}
-
message FilterExecNode {
PhysicalPlanNode input = 1;
PhysicalExprNode expr = 2;
uint32 default_filter_selectivity = 3;
- repeated uint32 projection = 9;
}
message FileGroup {
@@ -1003,10 +994,6 @@ message CsvScanExecNode {
oneof optional_escape {
string escape = 5;
}
- oneof optional_comment {
- string comment = 6;
- }
- bool newlines_in_values = 7;
}
message AvroScanExecNode {
@@ -1187,7 +1174,6 @@ message NestedLoopJoinExecNode {
message CoalesceBatchesExecNode {
PhysicalPlanNode input = 1;
uint32 target_batch_size = 2;
- optional uint32 fetch = 3;
}
message CoalescePartitionsExecNode {
diff --git a/ballista/core/proto/datafusion_common.proto
b/ballista/core/proto/datafusion_common.proto
index c3906abf..94490ec2 100644
--- a/ballista/core/proto/datafusion_common.proto
+++ b/ballista/core/proto/datafusion_common.proto
@@ -84,6 +84,7 @@ enum JoinType {
LEFTANTI = 5;
RIGHTSEMI = 6;
RIGHTANTI = 7;
+ LEFTMARK = 8;
}
enum JoinConstraint {
@@ -413,7 +414,7 @@ message CsvOptions {
bytes quote = 3; // Quote character as a byte
bytes escape = 4; // Optional escape character as a byte
CompressionTypeVariant compression = 5; // Compression type
- uint64 schema_infer_max_rec = 6; // Max records for schema inference
+ optional uint64 schema_infer_max_rec = 6; // Optional max records for schema
inference
string date_format = 7; // Optional date format
string datetime_format = 8; // Optional datetime format
string timestamp_format = 9; // Optional timestamp format
@@ -429,7 +430,7 @@ message CsvOptions {
// Options controlling CSV format
message JsonOptions {
CompressionTypeVariant compression = 1; // Compression type
- uint64 schema_infer_max_rec = 2; // Max records for schema inference
+ optional uint64 schema_infer_max_rec = 2; // Optional max records for schema
inference
}
message TableParquetOptions {
@@ -494,6 +495,7 @@ message ParquetOptions {
bool bloom_filter_on_read = 26; // default = true
bool bloom_filter_on_write = 27; // default = false
bool schema_force_view_types = 28; // default = false
+ bool binary_as_string = 29; // default = false
oneof metadata_size_hint_opt {
uint64 metadata_size_hint = 4;
@@ -540,9 +542,10 @@ message ParquetOptions {
string created_by = 16;
}
-enum JoinSide{
+enum JoinSide {
LEFT_SIDE = 0;
RIGHT_SIDE = 1;
+ NONE = 2;
}
message Precision{
diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs
index bb43e93b..25bdbad9 100644
--- a/ballista/core/src/extension.rs
+++ b/ballista/core/src/extension.rs
@@ -133,7 +133,7 @@ impl SessionStateExt for SessionState {
.with_round_robin_repartition(false);
let runtime_config = RuntimeConfig::default();
- let runtime_env = RuntimeEnv::new(runtime_config)?;
+ let runtime_env = RuntimeEnv::try_new(runtime_config)?;
let session_state = SessionStateBuilder::new()
.with_default_features()
.with_config(session_config)
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 14eeb9a2..913e955d 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -56,7 +56,7 @@ pub fn default_session_builder(
Ok(SessionStateBuilder::new()
.with_default_features()
.with_config(config)
- .with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default())?))
+
.with_runtime_env(Arc::new(RuntimeEnv::try_new(RuntimeConfig::default())?))
.build())
}
@@ -125,6 +125,17 @@ pub struct BallistaQueryPlanner<T: AsLogicalPlan> {
plan_repr: PhantomData<T>,
}
+impl<T: AsLogicalPlan> std::fmt::Debug for BallistaQueryPlanner<T> {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("BallistaQueryPlanner")
+ .field("scheduler_url", &self.scheduler_url)
+ .field("config", &self.config)
+ .field("extension_codec", &self.extension_codec)
+ .field("plan_repr", &self.plan_repr)
+ .finish()
+ }
+}
+
impl<T: 'static + AsLogicalPlan> BallistaQueryPlanner<T> {
pub fn new(scheduler_url: String, config: BallistaConfig) -> Self {
Self {
@@ -316,7 +327,7 @@ mod test {
use crate::utils::LocalRun;
fn context() -> SessionContext {
- let runtime_environment =
RuntimeEnv::new(RuntimeConfig::new()).unwrap();
+ let runtime_environment =
RuntimeEnv::try_new(RuntimeConfig::new()).unwrap();
let session_config =
SessionConfig::new().with_information_schema(true);
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index e350f391..fac02b48 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -204,7 +204,7 @@ pub async fn start_executor_process(
opt.override_runtime_producer.clone().unwrap_or_else(|| {
Arc::new(move |_| {
let config =
RuntimeConfig::new().with_temp_file_path(wd.clone());
- Ok(Arc::new(RuntimeEnv::new(config)?))
+ Ok(Arc::new(RuntimeEnv::try_new(config)?))
})
});
diff --git a/ballista/executor/src/standalone.rs
b/ballista/executor/src/standalone.rs
index 2c2906f0..57082fc2 100644
--- a/ballista/executor/src/standalone.rs
+++ b/ballista/executor/src/standalone.rs
@@ -185,7 +185,7 @@ pub async fn new_standalone_executor<
let wd = work_dir.clone();
let runtime_producer: RuntimeProducer = Arc::new(move |_: &SessionConfig| {
let config = RuntimeConfig::new().with_temp_file_path(wd.clone());
- Ok(Arc::new(RuntimeEnv::new(config)?))
+ Ok(Arc::new(RuntimeEnv::try_new(config)?))
});
let executor = Arc::new(Executor::new_basic(
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index 629cc285..8e4565a4 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -71,6 +71,7 @@ const TEST_SCHEDULER_NAME: &str = "localhost:50050";
/// Sometimes we need to construct logical plans that will produce errors
/// when we try and create physical plan. A scan using `ExplodingTableProvider`
/// will do the trick
+#[derive(Debug)]
pub struct ExplodingTableProvider;
#[async_trait]
@@ -135,7 +136,7 @@ pub async fn datafusion_test_context(path: &str) ->
Result<SessionContext> {
let default_shuffle_partitions = 2;
let config =
SessionConfig::new().with_target_partitions(default_shuffle_partitions);
let ctx = SessionContext::new_with_config(config);
- for table in TPCH_TABLES {
+ for &table in TPCH_TABLES {
let schema = get_tpch_schema(table);
let options = CsvReadOptions::new()
.schema(&schema)
@@ -143,7 +144,7 @@ pub async fn datafusion_test_context(path: &str) ->
Result<SessionContext> {
.has_header(false)
.file_extension(".tbl");
let dir = format!("{path}/{table}");
- ctx.register_csv(table, &dir, options).await?;
+ ctx.register_csv(table, dir, options).await?;
}
Ok(ctx)
}
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 77c48bb1..72cc848d 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -562,7 +562,7 @@ async fn register_tables(
ctx: &SessionContext,
debug: bool,
) -> Result<()> {
- for table in TABLES {
+ for &table in TABLES {
match file_format {
// dbgen creates .tbl ('|' delimited) files without header
"tbl" => {
diff --git a/docs/source/user-guide/extending-components.md
b/docs/source/user-guide/extending-components.md
index 556c0a36..60de1b7b 100644
--- a/docs/source/user-guide/extending-components.md
+++ b/docs/source/user-guide/extending-components.md
@@ -74,7 +74,7 @@ pub fn custom_runtime_env_with_s3_support(
CustomObjectStoreRegistry::new(s3options.clone()),
));
- Ok(Arc::new(RuntimeEnv::new(config)?))
+ Ok(Arc::new(RuntimeEnv::try_new(config)?))
}
```
diff --git a/examples/src/object_store.rs b/examples/src/object_store.rs
index 3cd22fa6..5b5e38a6 100644
--- a/examples/src/object_store.rs
+++ b/examples/src/object_store.rs
@@ -79,7 +79,7 @@ pub fn custom_runtime_env_with_s3_support(
CustomObjectStoreRegistry::new(s3options.clone()),
));
- Ok(Arc::new(RuntimeEnv::new(config)?))
+ Ok(Arc::new(RuntimeEnv::try_new(config)?))
}
/// Custom [SessionState] constructor method
diff --git a/examples/tests/object_store.rs b/examples/tests/object_store.rs
index ca47c5cb..3c3443e8 100644
--- a/examples/tests/object_store.rs
+++ b/examples/tests/object_store.rs
@@ -60,7 +60,7 @@ mod standalone {
let test_data = examples_test_data();
let config = RuntimeConfig::new();
- let runtime_env = RuntimeEnv::new(config)?;
+ let runtime_env = RuntimeEnv::try_new(config)?;
runtime_env.register_object_store(
&format!("s3://{}", crate::common::BUCKET)
@@ -147,7 +147,7 @@ mod remote {
.map_err(|e| DataFusionError::External(e.into()))?;
let config = RuntimeConfig::new();
- let runtime_env = RuntimeEnv::new(config)?;
+ let runtime_env = RuntimeEnv::try_new(config)?;
runtime_env.register_object_store(
&format!("s3://{}", crate::common::BUCKET)
@@ -219,6 +219,7 @@ mod custom_s3_config {
use ballista_core::RuntimeProducer;
use ballista_examples::object_store::{CustomObjectStoreRegistry,
S3Options};
use ballista_examples::test_util::examples_test_data;
+ use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::SessionState;
use datafusion::prelude::SessionConfig;
use datafusion::{assert_batches_eq, prelude::SessionContext};
@@ -288,7 +289,7 @@ mod custom_s3_config {
CustomObjectStoreRegistry::new(s3options.clone()),
));
- Ok(Arc::new(RuntimeEnv::new(config)?))
+ Ok(Arc::new(RuntimeEnv::try_new(config)?))
});
// Session builder creates SessionState
@@ -494,7 +495,8 @@ mod custom_s3_config {
let config = RuntimeConfig::new().with_object_store_registry(Arc::new(
CustomObjectStoreRegistry::new(s3options.clone()),
));
- let runtime_env = RuntimeEnv::new(config)?;
+
+ let runtime_env = RuntimeEnv::try_new(config)?;
Ok(SessionStateBuilder::new()
.with_runtime_env(runtime_env.into())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]