This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 934b32fd Upgrade to DataFusion 34.0.0-rc1 (#927)
934b32fd is described below
commit 934b32fd7b485bdd48d74a7375ccf7ba03c188b5
Author: Andy Grove <[email protected]>
AuthorDate: Tue Dec 12 08:34:00 2023 -0700
Upgrade to DataFusion 34.0.0-rc1 (#927)
* Use latest DF
* update deps
* Set max encoding size
* tests compile now
* fix some todo comments
* address another todo item
* specify message sizes in gRPC clients as well
* fix typo and use 34 rc1
---
Cargo.toml | 16 +++++-----
ballista/client/src/context.rs | 2 --
ballista/core/src/cache_layer/object_store/file.rs | 35 ++++++++++++++++++----
ballista/core/src/cache_layer/object_store/mod.rs | 24 +++++++++++----
ballista/core/src/cache_layer/policy/file.rs | 1 +
ballista/core/src/utils.rs | 2 +-
ballista/scheduler/src/cluster/mod.rs | 1 +
ballista/scheduler/src/scheduler_server/grpc.rs | 6 ----
benchmarks/src/bin/tpch.rs | 3 +-
9 files changed, 60 insertions(+), 30 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index fcdebab6..57752844 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,16 +29,16 @@ members = [
resolver = "2"
[workspace.dependencies]
-arrow = { version = "48.0.0", features=["ipc_compression"] }
-arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "48.0.0", default-features = false }
+arrow = { version = "49.0.0", features=["ipc_compression"] }
+arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "49.0.0", default-features = false }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
-datafusion = "33.0.0"
-datafusion-cli = "33.0.0"
-datafusion-proto = "33.0.0"
-object_store = "0.7.0"
-sqlparser = "0.39.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion", rev =
"34.0.0-rc1" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion", rev =
"34.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev =
"34.0.0-rc1" }
+object_store = "0.8.0"
+sqlparser = "0.40.0"
tonic = { version = "0.10" }
tonic-build = { version = "0.10", default-features = false, features = [
"transport",
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index ea381559..a1c0d7e3 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -616,8 +616,6 @@ mod tests {
target_partitions: x.target_partitions,
file_sort_order: vec![],
infinite_source: false,
- insert_mode:
-
datafusion::datasource::listing::ListingTableInsertMode::Error,
file_type_write_options: None,
single_file: false,
};
diff --git a/ballista/core/src/cache_layer/object_store/file.rs
b/ballista/core/src/cache_layer/object_store/file.rs
index d52ca954..169d2b5c 100644
--- a/ballista/core/src/cache_layer/object_store/file.rs
+++ b/ballista/core/src/cache_layer/object_store/file.rs
@@ -22,11 +22,12 @@ use crate::error::BallistaError;
use async_trait::async_trait;
use ballista_cache::loading_cache::LoadingCache;
use bytes::Bytes;
-use futures::stream::BoxStream;
+use futures::stream::{self, BoxStream, StreamExt};
use log::info;
use object_store::path::Path;
use object_store::{
Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
ObjectStore,
+ PutOptions, PutResult,
};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
@@ -73,7 +74,24 @@ impl<M> ObjectStore for FileCacheObjectStore<M>
where
M: CacheMedium,
{
- async fn put(&self, _location: &Path, _bytes: Bytes) ->
object_store::Result<()> {
+ async fn put(
+ &self,
+ _location: &Path,
+ _bytes: Bytes,
+ ) -> object_store::Result<PutResult> {
+ Err(Error::NotSupported {
+ source: Box::new(BallistaError::General(
+ "Write path is not supported".to_string(),
+ )),
+ })
+ }
+
+ async fn put_opts(
+ &self,
+ _location: &Path,
+ _bytes: Bytes,
+ _opts: PutOptions,
+ ) -> object_store::Result<PutResult> {
Err(Error::NotSupported {
source: Box::new(BallistaError::General(
"Write path is not supported".to_string(),
@@ -209,13 +227,18 @@ where
})
}
- async fn list(
+ fn list(
&self,
_prefix: Option<&Path>,
- ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
{
- Err(Error::NotSupported {
- source: Box::new(BallistaError::General("List is not
supported".to_string())),
+ ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
+ stream::once(async {
+ Err(Error::NotSupported {
+ source: Box::new(BallistaError::General(
+ "List is not supported".to_string(),
+ )),
+ })
})
+ .boxed()
}
async fn list_with_delimiter(
diff --git a/ballista/core/src/cache_layer/object_store/mod.rs
b/ballista/core/src/cache_layer/object_store/mod.rs
index b90a741f..6d754eca 100644
--- a/ballista/core/src/cache_layer/object_store/mod.rs
+++ b/ballista/core/src/cache_layer/object_store/mod.rs
@@ -22,7 +22,8 @@ use bytes::Bytes;
use futures::stream::BoxStream;
use object_store::path::Path;
use object_store::{
- GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+ GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutOptions,
+ PutResult,
};
use std::fmt::{Debug, Display, Formatter};
use std::ops::Range;
@@ -60,10 +61,23 @@ impl Display for ObjectStoreWithKey {
#[async_trait]
impl ObjectStore for ObjectStoreWithKey {
- async fn put(&self, location: &Path, bytes: Bytes) ->
object_store::Result<()> {
+ async fn put(
+ &self,
+ location: &Path,
+ bytes: Bytes,
+ ) -> object_store::Result<PutResult> {
self.inner.put(location, bytes).await
}
+ async fn put_opts(
+ &self,
+ location: &Path,
+ bytes: Bytes,
+ opts: PutOptions,
+ ) -> object_store::Result<PutResult> {
+ self.inner.put_opts(location, bytes, opts).await
+ }
+
async fn put_multipart(
&self,
location: &Path,
@@ -115,11 +129,11 @@ impl ObjectStore for ObjectStoreWithKey {
self.inner.delete(location).await
}
- async fn list(
+ fn list(
&self,
prefix: Option<&Path>,
- ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
{
- self.inner.list(prefix).await
+ ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
+ self.inner.list(prefix)
}
async fn list_with_delimiter(
diff --git a/ballista/core/src/cache_layer/policy/file.rs
b/ballista/core/src/cache_layer/policy/file.rs
index 98d76a11..dfc6c83f 100644
--- a/ballista/core/src/cache_layer/policy/file.rs
+++ b/ballista/core/src/cache_layer/policy/file.rs
@@ -202,6 +202,7 @@ where
last_modified: source_meta.last_modified,
size: cache_meta.size,
e_tag: source_meta.e_tag,
+ version: None,
})
}
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index e16c1b4c..a6541c08 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -319,7 +319,7 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for
BallistaQueryPlanner<T> {
match logical_plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(_)) => {
// table state is managed locally in the BallistaContext, not
in the scheduler
- Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
+ Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))))
}
_ => Ok(Arc::new(DistributedQueryExec::with_repr(
self.scheduler_url.clone(),
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index 793d3fc1..f7e1ca46 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -1092,6 +1092,7 @@ mod test {
last_modified: Default::default(),
size: 1,
e_tag: None,
+ version: None,
},
partition_values: vec![],
range: None,
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 8257891c..c1ff35de 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -308,12 +308,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
let path = Path::from(path.as_str());
let file_metas: Vec<_> = obj_store
.list(Some(&path))
- .await
- .map_err(|e| {
- let msg = format!("Error listing files: {e}");
- error!("{}", msg);
- tonic::Status::internal(msg)
- })?
.try_collect()
.await
.map_err(|e| {
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 189719c3..b2863a41 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -26,7 +26,7 @@ use ballista::prelude::{
use datafusion::arrow::array::*;
use datafusion::arrow::util::display::array_value_to_string;
use datafusion::common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
-use datafusion::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
+use datafusion::datasource::listing::ListingTableUrl;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
@@ -845,7 +845,6 @@ async fn get_table(
table_partition_cols: vec![],
file_sort_order: vec![],
infinite_source: false,
- insert_mode: ListingTableInsertMode::Error,
file_type_write_options: None,
single_file: false,
};