This is an automated email from the ASF dual-hosted git repository. tustvold pushed a commit to branch fix-file-format in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
commit 9f155ee303353a7358c7e3274a71efb455138d75 Author: Raphael Taylor-Davies <[email protected]> AuthorDate: Tue May 24 15:46:06 2022 +0100 Update with file format breaking change --- ballista-cli/Cargo.lock | 58 +++++++++++++++------- ballista-cli/Cargo.toml | 4 +- ballista/rust/client/Cargo.toml | 2 +- ballista/rust/core/Cargo.toml | 4 +- ballista/rust/executor/Cargo.toml | 2 +- ballista/rust/scheduler/Cargo.toml | 2 +- .../rust/scheduler/src/scheduler_server/grpc.rs | 25 ++++++---- benchmarks/Cargo.toml | 2 +- examples/Cargo.toml | 2 +- 9 files changed, 64 insertions(+), 37 deletions(-) diff --git a/ballista-cli/Cargo.lock b/ballista-cli/Cargo.lock index 528148c9..6bc2da93 100644 --- a/ballista-cli/Cargo.lock +++ b/ballista-cli/Cargo.lock @@ -63,9 +63,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6bee230122beb516ead31935a61f683715f987c6f003eff44ad6986624105a" +checksum = "0612b6a634de6c3f5e63fdaa6932f7bc598f92de0462ac6e69b0aebd77e093aa" dependencies = [ "bitflags", "chrono", @@ -88,9 +88,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a3666d2dbc637fa979d1f0bf3031d39a80e709f3b9ec88e3d573c1d666bf553" +checksum = "ce7b7cfa8eb0dcb0691f18b6a1d9c81cfe3c42726c254be5128d15ebe7580a1d" dependencies = [ "arrow", "base64", @@ -198,7 +198,7 @@ dependencies = [ [[package]] name = "ballista" -version = "0.6.0" +version = "0.7.0" dependencies = [ "ballista-core", "datafusion", @@ -212,7 +212,7 @@ dependencies = [ [[package]] name = "ballista-cli" -version = "0.6.0" +version = "0.7.0" dependencies = [ "arrow", "ballista", @@ -228,7 +228,7 @@ dependencies = [ [[package]] name = "ballista-core" -version = "0.6.0" +version = "0.7.0" dependencies = [ "ahash", "arrow-flight", @@ -500,7 +500,8 @@ dependencies = [ [[package]] name = "datafusion" -version = "7.0.0" +version = "8.0.0" +source = "git+https://github.com/tustvold/arrow-datafusion?rev=c2008c5f97d8d664bda496aacfec3686631cf905#c2008c5f97d8d664bda496aacfec3686631cf905" dependencies = [ "ahash", "arrow", @@ -511,6 +512,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "datafusion-row", + "datafusion-sql", "futures", "hashbrown 0.12.1", "lazy_static", @@ -532,7 +534,8 @@ dependencies = [ [[package]] name = "datafusion-cli" -version = "7.0.0" +version = "8.0.0" +source = "git+https://github.com/tustvold/arrow-datafusion?rev=c2008c5f97d8d664bda496aacfec3686631cf905#c2008c5f97d8d664bda496aacfec3686631cf905" dependencies = [ "arrow", "clap", @@ -546,7 +549,8 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "7.0.0" +version = "8.0.0" +source = "git+https://github.com/tustvold/arrow-datafusion?rev=c2008c5f97d8d664bda496aacfec3686631cf905#c2008c5f97d8d664bda496aacfec3686631cf905" dependencies = [ "arrow", "ordered-float 3.0.0", @@ -556,7 +560,8 @@ dependencies = [ [[package]] name = "datafusion-data-access" -version = "7.0.0" +version = "8.0.0" +source = "git+https://github.com/tustvold/arrow-datafusion?rev=c2008c5f97d8d664bda496aacfec3686631cf905#c2008c5f97d8d664bda496aacfec3686631cf905" dependencies = [ "async-trait", "chrono", @@ -569,7 +574,8 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "7.0.0" +version = "8.0.0" +source = "git+https://github.com/tustvold/arrow-datafusion?rev=c2008c5f97d8d664bda496aacfec3686631cf905#c2008c5f97d8d664bda496aacfec3686631cf905" dependencies = [ "ahash", "arrow", @@ -579,7 +585,8 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "7.0.0" +version = "8.0.0" +source = "git+https://github.com/tustvold/arrow-datafusion?rev=c2008c5f97d8d664bda496aacfec3686631cf905#c2008c5f97d8d664bda496aacfec3686631cf905" dependencies = [ "ahash", "arrow", @@ -602,7 +609,8 @@ dependencies = [ [[package]] name = "datafusion-proto" -version = "7.0.0" +version = "8.0.0" +source = "git+https://github.com/tustvold/arrow-datafusion?rev=c2008c5f97d8d664bda496aacfec3686631cf905#c2008c5f97d8d664bda496aacfec3686631cf905" dependencies = [ "datafusion", "prost", @@ -611,7 +619,8 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "7.0.0" +version = "8.0.0" +source = "git+https://github.com/tustvold/arrow-datafusion?rev=c2008c5f97d8d664bda496aacfec3686631cf905#c2008c5f97d8d664bda496aacfec3686631cf905" dependencies = [ "arrow", "datafusion-common", @@ -619,6 +628,20 @@ dependencies = [ "rand", ] +[[package]] +name = "datafusion-sql" +version = "8.0.0" +source = "git+https://github.com/tustvold/arrow-datafusion?rev=c2008c5f97d8d664bda496aacfec3686631cf905#c2008c5f97d8d664bda496aacfec3686631cf905" +dependencies = [ + "ahash", + "arrow", + "datafusion-common", + "datafusion-expr", + "hashbrown 0.12.1", + "sqlparser", + "tokio", +] + [[package]] name = "digest" version = "0.10.3" @@ -1526,14 +1549,15 @@ dependencies = [ [[package]] name = "parquet" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c6d737baed48775e87a69aa262f1fa2f1d6bd074dedbe9cac244b9aabf2a0b4" +checksum = "ba1185ee1da5091e40b86519265a44d2704e3916ff867059c915141cab14d413" dependencies = [ "arrow", "base64", "brotli", "byteorder", + "bytes", "chrono", "flate2", "lz4", diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml index ae098506..a7450087 100644 --- a/ballista-cli/Cargo.toml +++ b/ballista-cli/Cargo.toml @@ -32,8 +32,8 @@ readme = "README.md" arrow = { version = "14.0.0" } ballista = { path = "../ballista/rust/client", version = "0.7.0" } clap = { version = "3", features = ["derive", "cargo"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" } -datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" } +datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "c2008c5f97d8d664bda496aacfec3686631cf905" } +datafusion-cli = { git = "https://github.com/tustvold/arrow-datafusion", rev = "c2008c5f97d8d664bda496aacfec3686631cf905" } dirs = "4.0.0" env_logger = "0.9" mimalloc = { version = "0.1", default-features = false } diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml index 7980b432..14d0a8b6 100644 --- a/ballista/rust/client/Cargo.toml +++ b/ballista/rust/client/Cargo.toml @@ -32,7 +32,7 @@ ballista-core = { path = "../core", version = "0.7.0" } ballista-executor = { path = "../executor", version = "0.7.0", optional = true } ballista-scheduler = { path = "../scheduler", version = "0.7.0", optional = true } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" } +datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "c2008c5f97d8d664bda496aacfec3686631cf905" } futures = "0.3" log = "0.4" parking_lot = "0.12" diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 414335ec..2fd2188b 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -39,8 +39,8 @@ arrow-flight = { version = "14.0.0" } async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" } +datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "c2008c5f97d8d664bda496aacfec3686631cf905" } +datafusion-proto = { git = "https://github.com/tustvold/arrow-datafusion", rev = "c2008c5f97d8d664bda496aacfec3686631cf905" } futures = "0.3" hashbrown = "0.12" diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index a329a20c..554353f5 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -40,7 +40,7 @@ async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.7.0" } chrono = { version = "0.4", default-features = false } configure_me = "0.4.0" -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" } +datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "c2008c5f97d8d664bda496aacfec3686631cf905" } env_logger = "0.9" futures = "0.3" hyper = "0.14.4" diff --git a/ballista/rust/scheduler/Cargo.toml b/ballista/rust/scheduler/Cargo.toml index 699d4cd1..361981ea 100644 --- a/ballista/rust/scheduler/Cargo.toml +++ b/ballista/rust/scheduler/Cargo.toml @@ -41,7 +41,7 @@ async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.7.0" } clap = { version = "3", features = ["derive", "cargo"] } configure_me = "0.4.0" -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" } +datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "c2008c5f97d8d664bda496aacfec3686631cf905" } env_logger = "0.9" etcd-client = { version = "0.9", optional = true } futures = "0.3" diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs index 10be4786..9216155d 100644 --- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs @@ -38,7 +38,7 @@ use datafusion::datafusion_data_access::object_store::{ }; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; -use futures::StreamExt; +use futures::TryStreamExt; use log::{debug, error, info, trace, warn}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use std::convert::TryInto; @@ -281,7 +281,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc request: Request<GetFileMetadataParams>, ) -> std::result::Result<Response<GetFileMetadataResult>, tonic::Status> { // TODO support multiple object stores - let obj_store = LocalFileSystem {}; + let obj_store = Arc::new(LocalFileSystem {}) as Arc<dyn ObjectStore>; // TODO shouldn't this take a ListingOption object as input? let GetFileMetadataParams { path, file_type } = request.into_inner(); @@ -300,19 +300,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc )), }?; - let file_metas = obj_store.list_file(&path).await.map_err(|e| { - let msg = format!("Error listing files: {}", e); - error!("{}", msg); - tonic::Status::internal(msg) - })?; - - let obj_readers = file_metas.map(move |f| obj_store.file_reader(f?.sized_file)); + let file_metas: Vec<_> = obj_store + .list_file(&path) + .await + .map_err(|e| { + let msg = format!("Error listing files: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })? + .try_collect() + .await?; let schema = file_format - .infer_schema(Box::pin(obj_readers)) + .infer_schema(&obj_store, &file_metas) .await .map_err(|e| { - let msg = format!("Error infering schema: {}", e); + let msg = format!("Error inferring schema: {}", e); error!("{}", msg); tonic::Status::internal(msg) })?; diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 15dfe43b..dd202795 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -33,7 +33,7 @@ snmalloc = ["snmalloc-rs"] [dependencies] ballista = { path = "../ballista/rust/client" } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" } +datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "c2008c5f97d8d664bda496aacfec3686631cf905" } env_logger = "0.9" futures = "0.3" mimalloc = { version = "0.1", optional = true, default-features = false } diff --git a/examples/Cargo.toml b/examples/Cargo.toml index a54cf41d..00903b23 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -35,7 +35,7 @@ required-features = ["ballista/standalone"] [dependencies] ballista = { path = "../ballista/rust/client", version = "0.7.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "844bcda2664a04685b865afe7ff159c0648d2860" } +datafusion = { git = "https://github.com/tustvold/arrow-datafusion", rev = "c2008c5f97d8d664bda496aacfec3686631cf905" } futures = "0.3" num_cpus = "1.13.0" prost = "0.10"
