This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 00defaa5 Remove ObjectStore from FileScanConfig and ListingTableConfig
(#53)
00defaa5 is described below
commit 00defaa5ac3abfd2d75e18beb86486160cbf5887
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Jun 2 14:11:05 2022 +0100
Remove ObjectStore from FileScanConfig and ListingTableConfig (#53)
* Remove ObjectStore from FileScanConfig and ListingTableConfig
* Update datafusion pin
---
ballista-cli/Cargo.lock | 18 ++++++-------
ballista-cli/Cargo.toml | 4 +--
ballista/rust/client/Cargo.toml | 2 +-
ballista/rust/client/src/context.rs | 12 ++++-----
ballista/rust/core/Cargo.toml | 4 +--
ballista/rust/core/src/serde/logical_plan/mod.rs | 30 ++++++----------------
.../core/src/serde/physical_plan/from_proto.rs | 5 +---
ballista/rust/core/src/serde/physical_plan/mod.rs | 24 +++++------------
ballista/rust/executor/Cargo.toml | 2 +-
ballista/rust/scheduler/Cargo.toml | 2 +-
benchmarks/Cargo.toml | 2 +-
benchmarks/src/bin/tpch.rs | 14 +++++-----
examples/Cargo.toml | 2 +-
13 files changed, 44 insertions(+), 77 deletions(-)
diff --git a/ballista-cli/Cargo.lock b/ballista-cli/Cargo.lock
index c2f52d52..03572aff 100644
--- a/ballista-cli/Cargo.lock
+++ b/ballista-cli/Cargo.lock
@@ -500,7 +500,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "8.0.0"
-source =
"git+https://github.com/tustvold/arrow-datafusion?rev=9244547848cf2533d86d2ddab7c8ec3268560f10#9244547848cf2533d86d2ddab7c8ec3268560f10"
+source =
"git+https://github.com/tustvold/arrow-datafusion?rev=57a88f1f93c7d2a582f71aa040190ad79a231a7c#57a88f1f93c7d2a582f71aa040190ad79a231a7c"
dependencies = [
"ahash",
"arrow",
@@ -537,7 +537,7 @@ dependencies = [
[[package]]
name = "datafusion-cli"
version = "8.0.0"
-source =
"git+https://github.com/tustvold/arrow-datafusion?rev=9244547848cf2533d86d2ddab7c8ec3268560f10#9244547848cf2533d86d2ddab7c8ec3268560f10"
+source =
"git+https://github.com/tustvold/arrow-datafusion?rev=57a88f1f93c7d2a582f71aa040190ad79a231a7c#57a88f1f93c7d2a582f71aa040190ad79a231a7c"
dependencies = [
"arrow",
"clap",
@@ -552,7 +552,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "8.0.0"
-source =
"git+https://github.com/tustvold/arrow-datafusion?rev=9244547848cf2533d86d2ddab7c8ec3268560f10#9244547848cf2533d86d2ddab7c8ec3268560f10"
+source =
"git+https://github.com/tustvold/arrow-datafusion?rev=57a88f1f93c7d2a582f71aa040190ad79a231a7c#57a88f1f93c7d2a582f71aa040190ad79a231a7c"
dependencies = [
"arrow",
"ordered-float 3.0.0",
@@ -563,7 +563,7 @@ dependencies = [
[[package]]
name = "datafusion-data-access"
version = "8.0.0"
-source =
"git+https://github.com/tustvold/arrow-datafusion?rev=9244547848cf2533d86d2ddab7c8ec3268560f10#9244547848cf2533d86d2ddab7c8ec3268560f10"
+source =
"git+https://github.com/tustvold/arrow-datafusion?rev=57a88f1f93c7d2a582f71aa040190ad79a231a7c#57a88f1f93c7d2a582f71aa040190ad79a231a7c"
dependencies = [
"async-trait",
"chrono",
@@ -576,7 +576,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "8.0.0"
-source =
"git+https://github.com/tustvold/arrow-datafusion?rev=9244547848cf2533d86d2ddab7c8ec3268560f10#9244547848cf2533d86d2ddab7c8ec3268560f10"
+source =
"git+https://github.com/tustvold/arrow-datafusion?rev=57a88f1f93c7d2a582f71aa040190ad79a231a7c#57a88f1f93c7d2a582f71aa040190ad79a231a7c"
dependencies = [
"ahash",
"arrow",
@@ -587,7 +587,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "8.0.0"
-source =
"git+https://github.com/tustvold/arrow-datafusion?rev=9244547848cf2533d86d2ddab7c8ec3268560f10#9244547848cf2533d86d2ddab7c8ec3268560f10"
+source =
"git+https://github.com/tustvold/arrow-datafusion?rev=57a88f1f93c7d2a582f71aa040190ad79a231a7c#57a88f1f93c7d2a582f71aa040190ad79a231a7c"
dependencies = [
"ahash",
"arrow",
@@ -611,7 +611,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "8.0.0"
-source =
"git+https://github.com/tustvold/arrow-datafusion?rev=9244547848cf2533d86d2ddab7c8ec3268560f10#9244547848cf2533d86d2ddab7c8ec3268560f10"
+source =
"git+https://github.com/tustvold/arrow-datafusion?rev=57a88f1f93c7d2a582f71aa040190ad79a231a7c#57a88f1f93c7d2a582f71aa040190ad79a231a7c"
dependencies = [
"arrow",
"datafusion",
@@ -624,7 +624,7 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "8.0.0"
-source =
"git+https://github.com/tustvold/arrow-datafusion?rev=9244547848cf2533d86d2ddab7c8ec3268560f10#9244547848cf2533d86d2ddab7c8ec3268560f10"
+source =
"git+https://github.com/tustvold/arrow-datafusion?rev=57a88f1f93c7d2a582f71aa040190ad79a231a7c#57a88f1f93c7d2a582f71aa040190ad79a231a7c"
dependencies = [
"arrow",
"datafusion-common",
@@ -635,7 +635,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "8.0.0"
-source =
"git+https://github.com/tustvold/arrow-datafusion?rev=9244547848cf2533d86d2ddab7c8ec3268560f10#9244547848cf2533d86d2ddab7c8ec3268560f10"
+source =
"git+https://github.com/tustvold/arrow-datafusion?rev=57a88f1f93c7d2a582f71aa040190ad79a231a7c#57a88f1f93c7d2a582f71aa040190ad79a231a7c"
dependencies = [
"ahash",
"arrow",
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 9f19c2d0..b66dec1d 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -31,8 +31,8 @@ readme = "README.md"
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"9bf8bfbab47823effcbc78f98a675be64221f7cf" }
-datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"9bf8bfbab47823effcbc78f98a675be64221f7cf" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"585bc3a629b92ea7a86ebfe8bf762dbef4155710" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"585bc3a629b92ea7a86ebfe8bf762dbef4155710" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 534f24fe..f891d6bc 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -32,7 +32,7 @@ ballista-core = { path = "../core", version = "0.7.0" }
ballista-executor = { path = "../executor", version = "0.7.0", optional = true
}
ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional =
true }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"9bf8bfbab47823effcbc78f98a675be64221f7cf" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"585bc3a629b92ea7a86ebfe8bf762dbef4155710" }
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
diff --git a/ballista/rust/client/src/context.rs
b/ballista/rust/client/src/context.rs
index b34afd20..4209af5e 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -590,12 +590,10 @@ mod tests {
target_partitions: x.target_partitions,
};
- let config = ListingTableConfig::new(
- listing_table.object_store().clone(),
- listing_table.table_path().clone(),
- )
- .with_schema(Arc::new(Schema::new(vec![])))
- .with_listing_options(error_options);
+ let config =
+
ListingTableConfig::new(listing_table.table_path().clone())
+ .with_schema(Arc::new(Schema::new(vec![])))
+ .with_listing_options(error_options);
let error_table = ListingTable::try_new(config).unwrap();
@@ -612,7 +610,7 @@ mod tests {
.await
.unwrap();
let results = df.collect().await.unwrap();
- pretty::print_batches(&results);
+ pretty::print_batches(&results).unwrap();
}
#[tokio::test]
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 9a67d047..921f1ef6 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -39,8 +39,8 @@ arrow-flight = { version = "15.0.0" }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"9bf8bfbab47823effcbc78f98a675be64221f7cf" }
-datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"9bf8bfbab47823effcbc78f98a675be64221f7cf" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"585bc3a629b92ea7a86ebfe8bf762dbef4155710" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"585bc3a629b92ea7a86ebfe8bf762dbef4155710" }
futures = "0.3"
hashbrown = "0.12"
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index aab6c3e4..be76e538 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -230,20 +230,7 @@ impl AsLogicalPlan for LogicalPlanNode {
target_partitions: scan.target_partitions as usize,
};
- let object_store =
ctx.runtime_env().object_store(&url).map_err(|e| {
- BallistaError::NotImplemented(format!(
- "No object store is registered for path {}: {:?}",
- scan.path, e
- ))
- })?;
-
- println!(
- "Found object store {:?} for path {}",
- object_store,
- scan.path.as_str()
- );
-
- let config = ListingTableConfig::new(object_store, url)
+ let config = ListingTableConfig::new(url)
.with_listing_options(options)
.with_schema(Arc::new(schema));
@@ -1355,15 +1342,16 @@ mod roundtrip_tests {
ctx.runtime_env()
.register_object_store("test", custom_object_store.clone());
- let url = ListingTableUrl::parse("test://foo.csv").unwrap();
+ let table_path = "test:///employee.csv";
+ let url = ListingTableUrl::parse(table_path).unwrap();
let os = ctx.runtime_env().object_store(&url)?;
assert_eq!("TestObjectStore", &format!("{:?}", os));
- assert_eq!("test://foo.csv", &url.to_string());
+ assert_eq!(table_path, &url.to_string());
let schema = test_schema();
let plan = ctx
.read_csv(
- "test://employee.csv",
+ table_path,
CsvReadOptions::new().schema(&schema).has_header(true),
)
.await?
@@ -1381,20 +1369,18 @@ mod roundtrip_tests {
assert_eq!(format!("{:?}", plan), format!("{:?}", round_trip));
- let round_trip_store = match round_trip {
+ let table_path = match round_trip {
LogicalPlan::TableScan(scan) => {
let source = source_as_provider(&scan.source)?;
match source.as_ref().as_any().downcast_ref::<ListingTable>() {
- Some(listing_table) => {
- format!("{:?}", listing_table.object_store())
- }
+ Some(listing_table) => listing_table.table_path().clone(),
_ => panic!("expected a ListingTable"),
}
}
_ => panic!("expected a TableScan"),
};
- assert_eq!(round_trip_store, format!("{:?}", custom_object_store));
+ assert_eq!(table_path.as_str(), url.as_str());
Ok(())
}
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index 76ed8b81..7cfb2084 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -27,9 +27,7 @@ use crate::convert_required;
use crate::serde::{from_proto_binary_op, proto_error, protobuf};
use chrono::{TimeZone, Utc};
-use datafusion::datafusion_data_access::{
- object_store::local::LocalFileSystem, FileMeta, SizedFile,
-};
+use datafusion::datafusion_data_access::{FileMeta, SizedFile};
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
@@ -381,7 +379,6 @@ impl TryInto<FileScanConfig> for
&protobuf::FileScanExecConf {
let statistics = convert_required!(self.statistics)?;
Ok(FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
object_store_url: ObjectStoreUrl::parse(&self.object_store_url)?,
file_schema: schema,
file_groups: self
diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs
b/ballista/rust/core/src/serde/physical_plan/mod.rs
index cb2c08c1..a0b12391 100644
--- a/ballista/rust/core/src/serde/physical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/physical_plan/mod.rs
@@ -23,7 +23,6 @@ use prost::Message;
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
@@ -154,7 +153,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
}
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
- decode_scan_config(scan.base_conf.as_ref().unwrap(), runtime)?,
+ decode_scan_config(scan.base_conf.as_ref().unwrap())?,
scan.has_header,
str_to_byte(&scan.delimiter)?,
))),
@@ -165,12 +164,12 @@ impl AsExecutionPlan for PhysicalPlanNode {
.map(|expr| parse_expr(expr, registry))
.transpose()?;
Ok(Arc::new(ParquetExec::new(
- decode_scan_config(scan.base_conf.as_ref().unwrap(),
runtime)?,
+ decode_scan_config(scan.base_conf.as_ref().unwrap())?,
predicate,
)))
}
PhysicalPlanType::AvroScan(scan) => Ok(Arc::new(AvroExec::new(
- decode_scan_config(scan.base_conf.as_ref().unwrap(), runtime)?,
+ decode_scan_config(scan.base_conf.as_ref().unwrap())?,
))),
PhysicalPlanType::CoalesceBatches(coalesce_batches) => {
let input: Arc<dyn ExecutionPlan> = into_physical_plan!(
@@ -1073,7 +1072,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
fn decode_scan_config(
proto: &protobuf::FileScanExecConf,
- runtime: &RuntimeEnv,
) -> Result<FileScanConfig, BallistaError> {
let schema = Arc::new(convert_required!(proto.schema)?);
let projection = proto
@@ -1094,20 +1092,12 @@ fn decode_scan_config(
.map(|f| f.try_into())
.collect::<Result<Vec<_>, _>>()?;
- let (object_store, object_store_url) = match
proto.object_store_url.is_empty() {
- false => {
- let object_store_url =
ObjectStoreUrl::parse(&proto.object_store_url)?;
- let object_store = runtime.object_store(&object_store_url)?;
- (object_store, object_store_url)
- }
- true => (
- Arc::new(LocalFileSystem {}) as _,
- ObjectStoreUrl::local_filesystem(),
- ),
+ let object_store_url = match proto.object_store_url.is_empty() {
+ false => ObjectStoreUrl::parse(&proto.object_store_url)?,
+ true => ObjectStoreUrl::local_filesystem(),
};
Ok(FileScanConfig {
- object_store,
object_store_url,
file_schema: schema,
file_groups,
@@ -1151,7 +1141,6 @@ mod roundtrip_tests {
compute::kernels::sort::SortOptions,
datatypes::{DataType, Field, Schema},
},
- datafusion_data_access::object_store::local::LocalFileSystem,
datasource::listing::PartitionedFile,
logical_plan::{JoinType, Operator},
physical_plan::{
@@ -1378,7 +1367,6 @@ mod roundtrip_tests {
#[test]
fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
let scan_config = FileScanConfig {
- object_store: Arc::new(LocalFileSystem {}),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_schema: Arc::new(Schema::new(vec![Field::new(
"col",
diff --git a/ballista/rust/executor/Cargo.toml
b/ballista/rust/executor/Cargo.toml
index 0f114d0f..653b2d98 100644
--- a/ballista/rust/executor/Cargo.toml
+++ b/ballista/rust/executor/Cargo.toml
@@ -40,7 +40,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
chrono = { version = "0.4", default-features = false }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"9bf8bfbab47823effcbc78f98a675be64221f7cf" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"585bc3a629b92ea7a86ebfe8bf762dbef4155710" }
env_logger = "0.9"
futures = "0.3"
hyper = "0.14.4"
diff --git a/ballista/rust/scheduler/Cargo.toml
b/ballista/rust/scheduler/Cargo.toml
index 387a75a5..df6034d8 100644
--- a/ballista/rust/scheduler/Cargo.toml
+++ b/ballista/rust/scheduler/Cargo.toml
@@ -41,7 +41,7 @@ async-trait = "0.1.41"
ballista-core = { path = "../core", version = "0.7.0" }
clap = { version = "3", features = ["derive", "cargo"] }
configure_me = "0.4.0"
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"9bf8bfbab47823effcbc78f98a675be64221f7cf" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"585bc3a629b92ea7a86ebfe8bf762dbef4155710" }
env_logger = "0.9"
etcd-client = { version = "0.9", optional = true }
futures = "0.3"
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 752354f3..d59053af 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -33,7 +33,7 @@ snmalloc = ["snmalloc-rs"]
[dependencies]
ballista = { path = "../ballista/rust/client" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"9bf8bfbab47823effcbc78f98a675be64221f7cf" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"585bc3a629b92ea7a86ebfe8bf762dbef4155710" }
env_logger = "0.9"
futures = "0.3"
mimalloc = { version = "0.1", optional = true, default-features = false }
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 77ced15c..0c266812 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -52,7 +52,6 @@ use datafusion::{
};
use datafusion::{
arrow::util::pretty,
- datafusion_data_access::object_store::local::LocalFileSystem,
datasource::listing::{ListingOptions, ListingTable, ListingTableConfig},
};
@@ -295,9 +294,9 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt)
-> Result<Vec<RecordB
if opt.mem_table {
println!("Loading table '{}' into memory", table);
let start = Instant::now();
- let task_ctx = ctx.task_ctx();
let memtable =
- MemTable::load(table_provider, Some(opt.partitions),
task_ctx).await?;
+ MemTable::load(table_provider, Some(opt.partitions),
&ctx.state())
+ .await?;
println!(
"Loaded table '{}' into memory in {} ms",
table,
@@ -769,7 +768,7 @@ fn get_table(
};
let url = ListingTableUrl::parse(path)?;
- let config = ListingTableConfig::new(Arc::new(LocalFileSystem {}), url)
+ let config = ListingTableConfig::new(url)
.with_listing_options(options)
.with_schema(schema);
@@ -1496,10 +1495,9 @@ mod tests {
.has_header(false)
.file_extension(".tbl");
let listing_options = options.to_listing_options(1);
- let config =
- ListingTableConfig::new(Arc::new(LocalFileSystem {}),
path.clone())
- .with_listing_options(listing_options)
- .with_schema(Arc::new(schema));
+ let config = ListingTableConfig::new(path.clone())
+ .with_listing_options(listing_options)
+ .with_schema(Arc::new(schema));
let provider = ListingTable::try_new(config)?;
ctx.register_table(table, Arc::new(provider))?;
}
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index d21891c5..f81ad792 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -35,7 +35,7 @@ required-features = ["ballista/standalone"]
[dependencies]
ballista = { path = "../ballista/rust/client", version = "0.7.0" }
-datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"9bf8bfbab47823effcbc78f98a675be64221f7cf" }
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"585bc3a629b92ea7a86ebfe8bf762dbef4155710" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.10"