This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 311fa532 Update to apache/arrow-datafusion#2578 (#48)
311fa532 is described below
commit 311fa532655f1ae28dc8958036bd4e6fff80e5fa
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue May 31 12:35:20 2022 +0100
Update to apache/arrow-datafusion#2578 (#48)
* Update to https://github.com/apache/arrow-datafusion/pull/2578
* Fix standalone build
* Update datafusion pin
---
ballista-cli/Cargo.lock | 97 +++++++++++++++++++---
ballista-cli/Cargo.toml | 4 +-
ballista/rust/client/Cargo.toml | 2 +-
ballista/rust/client/src/context.rs | 2 +-
ballista/rust/core/Cargo.toml | 4 +-
ballista/rust/core/proto/ballista.proto | 1 +
ballista/rust/core/src/serde/logical_plan/mod.rs | 51 +++++++-----
.../core/src/serde/physical_plan/from_proto.rs | 2 +
ballista/rust/core/src/serde/physical_plan/mod.rs | 18 +++-
.../rust/core/src/serde/physical_plan/to_proto.rs | 1 +
ballista/rust/executor/Cargo.toml | 2 +-
ballista/rust/scheduler/Cargo.toml | 2 +-
benchmarks/Cargo.toml | 2 +-
benchmarks/src/bin/tpch.rs | 16 ++--
examples/Cargo.toml | 2 +-
15 files changed, 152 insertions(+), 54 deletions(-)
diff --git a/ballista-cli/Cargo.lock b/ballista-cli/Cargo.lock
index b74f03fe..248b69ec 100644
--- a/ballista-cli/Cargo.lock
+++ b/ballista-cli/Cargo.lock
@@ -500,7 +500,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "8.0.0"
-source =
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+source =
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7"
dependencies = [
"ahash",
"arrow",
@@ -513,7 +513,9 @@ dependencies = [
"datafusion-row",
"datafusion-sql",
"futures",
+ "glob",
"hashbrown 0.12.1",
+ "itertools",
"lazy_static",
"log",
"num_cpus",
@@ -528,13 +530,14 @@ dependencies = [
"tempfile",
"tokio",
"tokio-stream",
+ "url",
"uuid",
]
[[package]]
name = "datafusion-cli"
version = "8.0.0"
-source =
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+source =
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7"
dependencies = [
"arrow",
"clap",
@@ -549,7 +552,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "8.0.0"
-source =
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+source =
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7"
dependencies = [
"arrow",
"ordered-float 3.0.0",
@@ -560,12 +563,11 @@ dependencies = [
[[package]]
name = "datafusion-data-access"
version = "8.0.0"
-source =
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+source =
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7"
dependencies = [
"async-trait",
"chrono",
"futures",
- "glob",
"parking_lot",
"tempfile",
"tokio",
@@ -574,7 +576,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "8.0.0"
-source =
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+source =
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7"
dependencies = [
"ahash",
"arrow",
@@ -585,7 +587,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "8.0.0"
-source =
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+source =
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7"
dependencies = [
"ahash",
"arrow",
@@ -609,17 +611,23 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "8.0.0"
-source =
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+source =
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7"
dependencies = [
+ "arrow",
+ "async-trait",
"datafusion",
+ "datafusion-common",
+ "datafusion-data-access",
+ "datafusion-expr",
"prost",
+ "tokio",
"tonic-build",
]
[[package]]
name = "datafusion-row"
version = "8.0.0"
-source =
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+source =
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7"
dependencies = [
"arrow",
"datafusion-common",
@@ -630,7 +638,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "8.0.0"
-source =
"git+https://github.com/apache/arrow-datafusion?rev=9ea7dc6036a7b1d28c7450db4f26720b732a50de#9ea7dc6036a7b1d28c7450db4f26720b732a50de"
+source =
"git+https://github.com/apache/arrow-datafusion?rev=fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7#fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7"
dependencies = [
"ahash",
"arrow",
@@ -804,6 +812,16 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
+[[package]]
+name = "form_urlencoded"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191"
+dependencies = [
+ "matches",
+ "percent-encoding",
+]
+
[[package]]
name = "futures"
version = "0.3.21"
@@ -1072,6 +1090,17 @@ dependencies = [
"tokio-io-timeout",
]
+[[package]]
+name = "idna"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
+dependencies = [
+ "matches",
+ "unicode-bidi",
+ "unicode-normalization",
+]
+
[[package]]
name = "indexmap"
version = "1.8.1"
@@ -1273,6 +1302,12 @@ dependencies = [
"libc",
]
+[[package]]
+name = "matches"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
+
[[package]]
name = "matchit"
version = "0.5.0"
@@ -2134,6 +2169,21 @@ dependencies = [
"threadpool",
]
+[[package]]
+name = "tinyvec"
+version = "1.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
+dependencies = [
+ "tinyvec_macros",
+]
+
+[[package]]
+name = "tinyvec_macros"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
+
[[package]]
name = "tokio"
version = "1.18.1"
@@ -2350,6 +2400,21 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
+[[package]]
+name = "unicode-bidi"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
+
+[[package]]
+name = "unicode-normalization"
+version = "0.1.19"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9"
+dependencies = [
+ "tinyvec",
+]
+
[[package]]
name = "unicode-segmentation"
version = "1.9.0"
@@ -2368,6 +2433,18 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "957e51f3646910546462e67d5f7599b9e4fb8acdd304b087a6494730f9eebf04"
+[[package]]
+name = "url"
+version = "2.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
+dependencies = [
+ "form_urlencoded",
+ "idna",
+ "matches",
+ "percent-encoding",
+]
+
[[package]]
name = "utf8parse"
version = "0.2.0"
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index bed5dd74..11b8cf4b 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 0031be89..5566e47b 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -32,7 +32,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true
}
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional =
true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
diff --git a/ballista/rust/client/src/context.rs
b/ballista/rust/client/src/context.rs
index 97964fd6..b34afd20 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -592,7 +592,7 @@ mod tests {
let config = ListingTableConfig::new(
listing_table.object_store().clone(),
- listing_table.table_path().to_string(),
+ listing_table.table_path().clone(),
)
.with_schema(Arc::new(Schema::new(vec![])))
.with_listing_options(error_options);
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 07b5c219..553cebb4 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -39,8 +39,8 @@ arrow-flight = { version = "14.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
futures = "0.3"
hashbrown = "0.12"
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 7a53f510..8eb57f72 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -453,6 +453,7 @@ message FileScanExecConf {
ScanLimit limit = 5;
Statistics statistics = 6;
repeated string table_partition_cols = 7;
+ string object_store_url = 8;
}
message ParquetScanExecNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 39034d94..aab6c3e4 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -27,7 +27,9 @@ use datafusion::datasource::file_format::avro::AvroFormat;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
-use datafusion::datasource::listing::{ListingOptions, ListingTable,
ListingTableConfig};
+use datafusion::datasource::listing::{
+ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
+};
use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias,
Window,
};
@@ -219,6 +221,7 @@ impl AsLogicalPlan for LogicalPlanNode {
FileFormatType::Avro(..) =>
Arc::new(AvroFormat::default()),
};
+ let url = ListingTableUrl::parse(&scan.path)?;
let options = ListingOptions {
file_extension: scan.file_extension.clone(),
format: file_format,
@@ -227,16 +230,12 @@ impl AsLogicalPlan for LogicalPlanNode {
target_partitions: scan.target_partitions as usize,
};
- let object_store = ctx
- .runtime_env()
- .object_store(scan.path.as_str())
- .map_err(|e| {
- BallistaError::NotImplemented(format!(
- "No object store is registered for path {}: {:?}",
- scan.path, e
- ))
- })?
- .0;
+ let object_store =
ctx.runtime_env().object_store(&url).map_err(|e| {
+ BallistaError::NotImplemented(format!(
+ "No object store is registered for path {}: {:?}",
+ scan.path, e
+ ))
+ })?;
println!(
"Found object store {:?} for path {}",
@@ -244,7 +243,7 @@ impl AsLogicalPlan for LogicalPlanNode {
scan.path.as_str()
);
- let config = ListingTableConfig::new(object_store,
scan.path.as_str())
+ let config = ListingTableConfig::new(object_store, url)
.with_listing_options(options)
.with_schema(Arc::new(schema));
@@ -595,7 +594,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.options()
.table_partition_cols
.clone(),
- path: listing_table.table_path().to_owned(),
+ path: listing_table.table_path().to_string(),
schema: Some(schema),
projection,
filters,
@@ -1056,6 +1055,7 @@ mod roundtrip_tests {
use async_trait::async_trait;
use core::panic;
use datafusion::common::DFSchemaRef;
+ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::logical_plan::source_as_provider;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
@@ -1176,7 +1176,7 @@ mod roundtrip_tests {
vec![col("c1") + col("c2"), Expr::Literal((4.0).into())];
let plan = std::sync::Arc::new(
- test_scan_csv("employee.csv", Some(vec![3, 4]))
+ test_scan_csv("employee", Some(vec![3, 4]))
.await?
.sort(vec![col("salary")])?
.build()?,
@@ -1248,13 +1248,13 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_analyze() -> Result<()> {
- let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ let verbose_plan = test_scan_csv("employee", Some(vec![3, 4]))
.await?
.sort(vec![col("salary")])?
.explain(true, true)?
.build()?;
- let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ let plan = test_scan_csv("employee", Some(vec![3, 4]))
.await?
.sort(vec![col("salary")])?
.explain(false, true)?
@@ -1269,13 +1269,13 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_explain() -> Result<()> {
- let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ let verbose_plan = test_scan_csv("employee", Some(vec![3, 4]))
.await?
.sort(vec![col("salary")])?
.explain(true, false)?
.build()?;
- let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ let plan = test_scan_csv("employee", Some(vec![3, 4]))
.await?
.sort(vec![col("salary")])?
.explain(false, false)?
@@ -1311,7 +1311,7 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_sort() -> Result<()> {
- let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ let plan = test_scan_csv("employee", Some(vec![3, 4]))
.await?
.sort(vec![col("salary")])?
.build()?;
@@ -1335,7 +1335,7 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_logical_plan() -> Result<()> {
- let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ let plan = test_scan_csv("employee", Some(vec![3, 4]))
.await?
.aggregate(vec![col("state")], vec![max(col("salary"))])?
.build()?;
@@ -1355,9 +1355,10 @@ mod roundtrip_tests {
ctx.runtime_env()
.register_object_store("test", custom_object_store.clone());
- let (os, uri) = ctx.runtime_env().object_store("test://foo.csv")?;
+ let url = ListingTableUrl::parse("test://foo.csv").unwrap();
+ let os = ctx.runtime_env().object_store(&url)?;
assert_eq!("TestObjectStore", &format!("{:?}", os));
- assert_eq!("foo.csv", uri);
+ assert_eq!("test://foo.csv", &url.to_string());
let schema = test_schema();
let plan = ctx
@@ -1415,7 +1416,11 @@ mod roundtrip_tests {
let schema = test_schema();
let ctx = SessionContext::new();
let options = CsvReadOptions::new().schema(&schema);
- let df = ctx.read_csv(table_name, options).await?;
+
+ let uri = format!("file:///{}.csv", table_name);
+ ctx.register_csv(table_name, &uri, options).await?;
+
+ let df = ctx.table(table_name)?;
let plan = match df.to_logical_plan()? {
LogicalPlan::TableScan(ref scan) => {
let mut scan = scan.clone();
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 6cd2dc18..76ed8b81 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -31,6 +31,7 @@ use datafusion::datafusion_data_access::{
object_store::local::LocalFileSystem, FileMeta, SizedFile,
};
use datafusion::datasource::listing::{FileRange, PartitionedFile};
+use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_plan::FunctionRegistry;
@@ -381,6 +382,7 @@ impl TryInto<FileScanConfig> for
&protobuf::FileScanExecConf {
Ok(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
+ object_store_url: ObjectStoreUrl::parse(&self.object_store_url)?,
file_schema: schema,
file_groups: self
.file_groups
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index 33651a06..cb2c08c1 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -25,6 +25,7 @@ use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::window_frames::WindowFrame;
use datafusion::logical_plan::FunctionRegistry;
@@ -1093,14 +1094,21 @@ fn decode_scan_config(
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?;
- let object_store = if let Some(file) = file_groups.get(0).and_then(|h|
h.get(0)) {
- runtime.object_store(file.file_meta.path())?.0
- } else {
- Arc::new(LocalFileSystem {})
+ let (object_store, object_store_url) = match
proto.object_store_url.is_empty() {
+ false => {
+ let object_store_url =
ObjectStoreUrl::parse(&proto.object_store_url)?;
+ let object_store = runtime.object_store(&object_store_url)?;
+ (object_store, object_store_url)
+ }
+ true => (
+ Arc::new(LocalFileSystem {}) as _,
+ ObjectStoreUrl::local_filesystem(),
+ ),
};
Ok(FileScanConfig {
object_store,
+ object_store_url,
file_schema: schema,
file_groups,
statistics,
@@ -1129,6 +1137,7 @@ mod roundtrip_tests {
use std::sync::Arc;
use datafusion::arrow::array::ArrayRef;
+ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{BuiltinScalarFunction, Volatility};
use datafusion::logical_plan::create_udf;
@@ -1370,6 +1379,7 @@ mod roundtrip_tests {
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
let scan_config = FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
+ object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: Arc::new(Schema::new(vec![Field::new(
"col",
DataType::Utf8,
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 1b96a855..85aea6c4 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -441,6 +441,7 @@ impl TryFrom<&FileScanConfig> for
protobuf::FileScanExecConf {
.collect(),
schema: Some(conf.file_schema.as_ref().into()),
table_partition_cols: conf.table_partition_cols.to_vec(),
+ object_store_url: conf.object_store_url.to_string(),
})
}
}
diff --git a/ballista/rust/executor/Cargo.toml
b/ballista/rust/executor/Cargo.toml
index 206592a4..76590b5a 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -40,7 +40,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
env_logger = "0.9"
futures = "0.3"
hyper = "0.14.4"
diff --git a/ballista/rust/scheduler/Cargo.toml
b/ballista/rust/scheduler/Cargo.toml
index ee01cceb..008013ca 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,7 +41,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
env_logger = "0.9"
etcd-client = { version = "0.9", optional = true }
futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 5e59f362..29ac95f4 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,7 +33,7 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 8542dc3d..77ced15c 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -58,6 +58,7 @@ use datafusion::{
use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
+use datafusion::datasource::listing::ListingTableUrl;
use serde::Serialize;
use structopt::StructOpt;
@@ -767,7 +768,8 @@ fn get_table(
table_partition_cols: vec![],
};
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), path)
+ let url = ListingTableUrl::parse(path)?;
+ let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), url)
.with_listing_options(options)
.with_schema(schema);
@@ -1466,6 +1468,7 @@ mod tests {
use ballista_core::serde::{
protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec,
};
+ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::physical_plan::ExecutionPlan;
use std::ops::Deref;
@@ -1483,6 +1486,7 @@ mod tests {
// is not set.
let tpch_data_path =
env::var("TPCH_DATA").unwrap_or_else(|_| "./".to_string());
+ let path = ListingTableUrl::parse(tpch_data_path)?;
for &table in TABLES {
let schema = get_schema(table);
@@ -1492,12 +1496,10 @@ mod tests {
.has_header(false)
.file_extension(".tbl");
let listing_options = options.to_listing_options(1);
- let config = ListingTableConfig::new(
- Arc::new(LocalFileSystem {}),
- tpch_data_path.clone(),
- )
- .with_listing_options(listing_options)
- .with_schema(Arc::new(schema));
+ let config =
+ ListingTableConfig::new(Arc::new(LocalFileSystem {}),
path.clone())
+ .with_listing_options(listing_options)
+ .with_schema(Arc::new(schema));
let provider = ListingTable::try_new(config)?;
ctx.register_table(table, Arc::new(provider))?;
}
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index f925cf10..31b9db03 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"b6fb0dd52c2abd0f8e134aa46cc1571cc6a0971b" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"fdb8fecf0ab475ba07dc0d15f7b53e25ccf30ee7" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.10"