This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 311fa532 Update to apache/arrow-datafusion#2578 (#48)
311fa532 is described below

commit 311fa532655f1ae28dc8958036bd4e6fff80e5fa
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue May 31 12:35:20 2022 +0100

    Update to apache/arrow-datafusion#2578 (#48)
    
    * Update to https://github.com/apache/arrow-datafusion/pull/2578
    
    * Fix standalone build
    
    * Update datafusion pin
---
 ballista-cli/Cargo.lock                            | 97 +++++++++++++++++++---
 ballista-cli/Cargo.toml                            |  4 +-
 ballista/rust/client/Cargo.toml                    |  2 +-
 ballista/rust/client/src/context.rs                |  2 +-
 ballista/rust/core/Cargo.toml                      |  4 +-
 ballista/rust/core/proto/ballista.proto            |  1 +
 ballista/rust/core/src/serde/logical_plan/mod.rs   | 51 +++++++-----
 .../core/src/serde/physical_plan/from_proto.rs     |  2 +
 ballista/rust/core/src/serde/physical_plan/mod.rs  | 18 +++-
 .../rust/core/src/serde/physical_plan/to_proto.rs  |  1 +
 ballista/rust/executor/Cargo.toml                  |  2 +-
 ballista/rust/scheduler/Cargo.toml                 |  2 +-
 benchmarks/Cargo.toml                              |  2 +-
 benchmarks/src/bin/tpch.rs                         | 16 ++--
 examples/Cargo.toml                                |  2 +-
 15 files changed, 152 insertions(+), 54 deletions(-)

diff --git a/ballista-cli/Cargo.lock b/ballista-cli/Cargo.lock
index b74f03fe..248b69ec 100644
--- a/ballista-cli/Cargo.lock
+++ b/ballista-cli/Cargo.lock
@@ -500,7 +500,7 @@ dependencies = [
 [[package]]
 name = "datafusion"
 version = "8.0.0"
-source = 
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de";
+source = 
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7";
 dependencies = [
  "ahash",
  "arrow",
@@ -513,7 +513,9 @@ dependencies = [
  "datafusion-row",
  "datafusion-sql",
  "futures",
+ "glob",
  "hashbrown 0.12.1",
+ "itertools",
  "lazy_static",
  "log",
  "num_cpus",
@@ -528,13 +530,14 @@ dependencies = [
  "tempfile",
  "tokio",
  "tokio-stream",
+ "url",
  "uuid",
 ]
 
 [[package]]
 name = "datafusion-cli"
 version = "8.0.0"
-source = 
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de";
+source = 
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7";
 dependencies = [
  "arrow",
  "clap",
@@ -549,7 +552,7 @@ dependencies = [
 [[package]]
 name = "datafusion-common"
 version = "8.0.0"
-source = 
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de";
+source = 
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7";
 dependencies = [
  "arrow",
  "ordered-float 3.0.0",
@@ -560,12 +563,11 @@ dependencies = [
 [[package]]
 name = "datafusion-data-access"
 version = "8.0.0"
-source = 
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de";
+source = 
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7";
 dependencies = [
  "async-trait",
  "chrono",
  "futures",
- "glob",
  "parking_lot",
  "tempfile",
  "tokio",
@@ -574,7 +576,7 @@ dependencies = [
 [[package]]
 name = "datafusion-expr"
 version = "8.0.0"
-source = 
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de";
+source = 
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7";
 dependencies = [
  "ahash",
  "arrow",
@@ -585,7 +587,7 @@ dependencies = [
 [[package]]
 name = "datafusion-physical-expr"
 version = "8.0.0"
-source = 
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de";
+source = 
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7";
 dependencies = [
  "ahash",
  "arrow",
@@ -609,17 +611,23 @@ dependencies = [
 [[package]]
 name = "datafusion-proto"
 version = "8.0.0"
-source = 
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de";
+source = 
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7";
 dependencies = [
+ "arrow",
+ "async-trait",
  "datafusion",
+ "datafusion-common",
+ "datafusion-data-access",
+ "datafusion-expr",
  "prost",
+ "tokio",
  "tonic-build",
 ]
 
 [[package]]
 name = "datafusion-row"
 version = "8.0.0"
-source = 
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de";
+source = 
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7";
 dependencies = [
  "arrow",
  "datafusion-common",
@@ -630,7 +638,7 @@ dependencies = [
 [[package]]
 name = "datafusion-sql"
 version = "8.0.0"
-source = 
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de";
+source = 
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7";
 dependencies = [
  "ahash",
  "arrow",
@@ -804,6 +812,16 @@ version = "1.0.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
 
+[[package]]
+name = "form_urlencoded"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191"
+dependencies = [
+ "matches",
+ "percent-encoding",
+]
+
 [[package]]
 name = "futures"
 version = "0.3.21"
@@ -1072,6 +1090,17 @@ dependencies = [
  "tokio-io-timeout",
 ]
 
+[[package]]
+name = "idna"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
+dependencies = [
+ "matches",
+ "unicode-bidi",
+ "unicode-normalization",
+]
+
 [[package]]
 name = "indexmap"
 version = "1.8.1"
@@ -1273,6 +1302,12 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "matches"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
+
 [[package]]
 name = "matchit"
 version = "0.5.0"
@@ -2134,6 +2169,21 @@ dependencies = [
  "threadpool",
 ]
 
+[[package]]
+name = "tinyvec"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
+dependencies = [
+ "tinyvec_macros",
+]
+
+[[package]]
+name = "tinyvec_macros"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
+
 [[package]]
 name = "tokio"
 version = "1.18.1"
@@ -2350,6 +2400,21 @@ version = "1.15.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
 
+[[package]]
+name = "unicode-bidi"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
+
+[[package]]
+name = "unicode-normalization"
+version = "0.1.19"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9"
+dependencies = [
+ "tinyvec",
+]
+
 [[package]]
 name = "unicode-segmentation"
 version = "1.9.0"
@@ -2368,6 +2433,18 @@ version = "0.2.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04"
 
+[[package]]
+name = "url"
+version = "2.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
+dependencies = [
+ "form_urlencoded",
+ "idna",
+ "matches",
+ "percent-encoding",
+]
+
 [[package]]
 name = "utf8parse"
 version = "0.2.0"
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index bed5dd74..11b8cf4b 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 0031be89..5566e47b 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -32,7 +32,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
 ballista-executor = { path = "../executor", version = "0.7.0", optional = true 
}
 ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = 
true }
 
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.12"
diff --git a/ballista/rust/client/src/context.rs 
b/ballista/rust/client/src/context.rs
index 97964fd6..b34afd20 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -592,7 +592,7 @@ mod tests {
 
                     let config = ListingTableConfig::new(
                         listing_table.object_store().clone(),
-                        listing_table.table_path().to_string(),
+                        listing_table.table_path().clone(),
                     )
                     .with_schema(Arc::new(Schema::new(vec![])))
                     .with_listing_options(error_options);
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 07b5c219..553cebb4 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -39,8 +39,8 @@ arrow-flight = { version = "14.0.0" }
 async-trait = "0.1.41"
 chrono = { version = "0.4", default-features = false }
 clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
 futures = "0.3"
 hashbrown = "0.12"
 
diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 7a53f510..8eb57f72 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -453,6 +453,7 @@ message FileScanExecConf {
   ScanLimit limit = 5;
   Statistics statistics = 6;
   repeated string table_partition_cols = 7;
+  string object_store_url = 8;
 }
 
 message ParquetScanExecNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs 
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 39034d94..aab6c3e4 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -27,7 +27,9 @@ use datafusion::datasource::file_format::avro::AvroFormat;
 use datafusion::datasource::file_format::csv::CsvFormat;
 use datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable, 
ListingTableConfig};
+use datafusion::datasource::listing::{
+    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
 use datafusion::logical_plan::plan::{
     Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, 
Window,
 };
@@ -219,6 +221,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         FileFormatType::Avro(..) => 
Arc::new(AvroFormat::default()),
                     };
 
+                let url = ListingTableUrl::parse(&scan.path)?;
                 let options = ListingOptions {
                     file_extension: scan.file_extension.clone(),
                     format: file_format,
@@ -227,16 +230,12 @@ impl AsLogicalPlan for LogicalPlanNode {
                     target_partitions: scan.target_partitions as usize,
                 };
 
-                let object_store = ctx
-                    .runtime_env()
-                    .object_store(scan.path.as_str())
-                    .map_err(|e| {
-                        BallistaError::NotImplemented(format!(
-                            "No object store is registered for path {}: {:?}",
-                            scan.path, e
-                        ))
-                    })?
-                    .0;
+                let object_store = 
ctx.runtime_env().object_store(&url).map_err(|e| {
+                    BallistaError::NotImplemented(format!(
+                        "No object store is registered for path {}: {:?}",
+                        scan.path, e
+                    ))
+                })?;
 
                 println!(
                     "Found object store {:?} for path {}",
@@ -244,7 +243,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     scan.path.as_str()
                 );
 
-                let config = ListingTableConfig::new(object_store, 
scan.path.as_str())
+                let config = ListingTableConfig::new(object_store, url)
                     .with_listing_options(options)
                     .with_schema(Arc::new(schema));
 
@@ -595,7 +594,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                                     .options()
                                     .table_partition_cols
                                     .clone(),
-                                path: listing_table.table_path().to_owned(),
+                                path: listing_table.table_path().to_string(),
                                 schema: Some(schema),
                                 projection,
                                 filters,
@@ -1056,6 +1055,7 @@ mod roundtrip_tests {
     use async_trait::async_trait;
     use core::panic;
     use datafusion::common::DFSchemaRef;
+    use datafusion::datasource::listing::ListingTableUrl;
     use datafusion::logical_plan::source_as_provider;
     use datafusion::{
         arrow::datatypes::{DataType, Field, Schema},
@@ -1176,7 +1176,7 @@ mod roundtrip_tests {
             vec![col("c1") + col("c2"), Expr::Literal((4.0).into())];
 
         let plan = std::sync::Arc::new(
-            test_scan_csv("employee.csv", Some(vec![3, 4]))
+            test_scan_csv("employee", Some(vec![3, 4]))
                 .await?
                 .sort(vec![col("salary")])?
                 .build()?,
@@ -1248,13 +1248,13 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_analyze() -> Result<()> {
-        let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+        let verbose_plan = test_scan_csv("employee", Some(vec![3, 4]))
             .await?
             .sort(vec![col("salary")])?
             .explain(true, true)?
             .build()?;
 
-        let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+        let plan = test_scan_csv("employee", Some(vec![3, 4]))
             .await?
             .sort(vec![col("salary")])?
             .explain(false, true)?
@@ -1269,13 +1269,13 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_explain() -> Result<()> {
-        let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+        let verbose_plan = test_scan_csv("employee", Some(vec![3, 4]))
             .await?
             .sort(vec![col("salary")])?
             .explain(true, false)?
             .build()?;
 
-        let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+        let plan = test_scan_csv("employee", Some(vec![3, 4]))
             .await?
             .sort(vec![col("salary")])?
             .explain(false, false)?
@@ -1311,7 +1311,7 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_sort() -> Result<()> {
-        let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+        let plan = test_scan_csv("employee", Some(vec![3, 4]))
             .await?
             .sort(vec![col("salary")])?
             .build()?;
@@ -1335,7 +1335,7 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_logical_plan() -> Result<()> {
-        let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+        let plan = test_scan_csv("employee", Some(vec![3, 4]))
             .await?
             .aggregate(vec![col("state")], vec![max(col("salary"))])?
             .build()?;
@@ -1355,9 +1355,10 @@ mod roundtrip_tests {
         ctx.runtime_env()
             .register_object_store("test", custom_object_store.clone());
 
-        let (os, uri) = ctx.runtime_env().object_store("test://foo.csv")?;
+        let url = ListingTableUrl::parse("test://foo.csv").unwrap();
+        let os = ctx.runtime_env().object_store(&url)?;
         assert_eq!("TestObjectStore", &format!("{:?}", os));
-        assert_eq!("foo.csv", uri);
+        assert_eq!("test://foo.csv", &url.to_string());
 
         let schema = test_schema();
         let plan = ctx
@@ -1415,7 +1416,11 @@ mod roundtrip_tests {
         let schema = test_schema();
         let ctx = SessionContext::new();
         let options = CsvReadOptions::new().schema(&schema);
-        let df = ctx.read_csv(table_name, options).await?;
+
+        let uri = format!("file:///{}.csv", table_name);
+        ctx.register_csv(table_name, &uri, options).await?;
+
+        let df = ctx.table(table_name)?;
         let plan = match df.to_logical_plan()? {
             LogicalPlan::TableScan(ref scan) => {
                 let mut scan = scan.clone();
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 6cd2dc18..76ed8b81 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -31,6 +31,7 @@ use datafusion::datafusion_data_access::{
     object_store::local::LocalFileSystem, FileMeta, SizedFile,
 };
 use datafusion::datasource::listing::{FileRange, PartitionedFile};
+use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::context::ExecutionProps;
 use datafusion::logical_plan::FunctionRegistry;
 
@@ -381,6 +382,7 @@ impl TryInto<FileScanConfig> for 
&protobuf::FileScanExecConf {
 
         Ok(FileScanConfig {
             object_store: Arc::new(LocalFileSystem {}),
+            object_store_url: ObjectStoreUrl::parse(&self.object_store_url)?,
             file_schema: schema,
             file_groups: self
                 .file_groups
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs 
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 33651a06..cb2c08c1 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -25,6 +25,7 @@ use datafusion::arrow::compute::SortOptions;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
 use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::logical_plan::window_frames::WindowFrame;
 use datafusion::logical_plan::FunctionRegistry;
@@ -1093,14 +1094,21 @@ fn decode_scan_config(
         .map(|f| f.try_into())
         .collect::<Result<Vec<_>, _>>()?;
 
-    let object_store = if let Some(file) = file_groups.get(0).and_then(|h| 
h.get(0)) {
-        runtime.object_store(file.file_meta.path())?.0
-    } else {
-        Arc::new(LocalFileSystem {})
+    let (object_store, object_store_url) = match 
proto.object_store_url.is_empty() {
+        false => {
+            let object_store_url = 
ObjectStoreUrl::parse(&proto.object_store_url)?;
+            let object_store = runtime.object_store(&object_store_url)?;
+            (object_store, object_store_url)
+        }
+        true => (
+            Arc::new(LocalFileSystem {}) as _,
+            ObjectStoreUrl::local_filesystem(),
+        ),
     };
 
     Ok(FileScanConfig {
         object_store,
+        object_store_url,
         file_schema: schema,
         file_groups,
         statistics,
@@ -1129,6 +1137,7 @@ mod roundtrip_tests {
     use std::sync::Arc;
 
     use datafusion::arrow::array::ArrayRef;
+    use datafusion::datasource::object_store::ObjectStoreUrl;
     use datafusion::execution::context::ExecutionProps;
     use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
     use datafusion::logical_plan::create_udf;
@@ -1370,6 +1379,7 @@ mod roundtrip_tests {
     fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
         let scan_config = FileScanConfig {
             object_store: Arc::new(LocalFileSystem {}),
+            object_store_url: ObjectStoreUrl::local_filesystem(),
             file_schema: Arc::new(Schema::new(vec![Field::new(
                 "col",
                 DataType::Utf8,
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs 
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 1b96a855..85aea6c4 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -441,6 +441,7 @@ impl TryFrom<&FileScanConfig> for 
protobuf::FileScanExecConf {
                 .collect(),
             schema: Some(conf.file_schema.as_ref().into()),
             table_partition_cols: conf.table_partition_cols.to_vec(),
+            object_store_url: conf.object_store_url.to_string(),
         })
     }
 }
diff --git a/ballista/rust/executor/Cargo.toml 
b/ballista/rust/executor/Cargo.toml
index 206592a4..76590b5a 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -40,7 +40,7 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 chrono = { version = "0.4", default-features = false }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
 env_logger = "0.9"
 futures = "0.3"
 hyper = "0.14.4"
diff --git a/ballista/rust/scheduler/Cargo.toml 
b/ballista/rust/scheduler/Cargo.toml
index ee01cceb..008013ca 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,7 +41,7 @@ async-trait = "0.1.41"
 ballista-core = { path = "../core", version = "0.7.0" }
 clap = { version = "3", features = ["derive", "cargo"] }
 configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
 env_logger = "0.9"
 etcd-client = { version = "0.9", optional = true }
 futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 5e59f362..29ac95f4 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,7 +33,7 @@ snmalloc = ["snmalloc-rs"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
 env_logger = "0.9"
 futures = "0.3"
 mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 8542dc3d..77ced15c 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -58,6 +58,7 @@ use datafusion::{
 
 use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
 use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use datafusion::datasource::listing::ListingTableUrl;
 use serde::Serialize;
 use structopt::StructOpt;
 
@@ -767,7 +768,8 @@ fn get_table(
         table_partition_cols: vec![],
     };
 
-    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
+    let url = ListingTableUrl::parse(path)?;
+    let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), url)
         .with_listing_options(options)
         .with_schema(schema);
 
@@ -1466,6 +1468,7 @@ mod tests {
         use ballista_core::serde::{
             protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec,
         };
+        use datafusion::datasource::listing::ListingTableUrl;
         use datafusion::physical_plan::ExecutionPlan;
         use std::ops::Deref;
 
@@ -1483,6 +1486,7 @@ mod tests {
             // is not set.
             let tpch_data_path =
                 env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
+            let path = ListingTableUrl::parse(tpch_data_path)?;
 
             for &table in TABLES {
                 let schema = get_schema(table);
@@ -1492,12 +1496,10 @@ mod tests {
                     .has_header(false)
                     .file_extension(".tbl");
                 let listing_options = options.to_listing_options(1);
-                let config = ListingTableConfig::new(
-                    Arc::new(LocalFileSystem {}),
-                    tpch_data_path.clone(),
-                )
-                .with_listing_options(listing_options)
-                .with_schema(Arc::new(schema));
+                let config =
+                    ListingTableConfig::new(Arc::new(LocalFileSystem {}), 
path.clone())
+                        .with_listing_options(listing_options)
+                        .with_schema(Arc::new(schema));
                 let provider = ListingTable::try_new(config)?;
                 ctx.register_table(table, Arc::new(provider))?;
             }
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index f925cf10..31b9db03 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
 
 [dependencies]
 ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
 futures = "0.3"
 num_cpus = "1.13.0"
 prost = "0.10"

Reply via email to