This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 934b32fd Upgrade to DataFusion 34.0.0-rc1 (#927)
934b32fd is described below

commit 934b32fd7b485bdd48d74a7375ccf7ba03c188b5
Author: Andy Grove <[email protected]>
AuthorDate: Tue Dec 12 08:34:00 2023 -0700

    Upgrade to DataFusion 34.0.0-rc1 (#927)
    
    * Use latest DF
    
    * update deps
    
    * Set max encoding size
    
    * tests compile now
    
    * fix some todo comments
    
    * address another todo item
    
    * specify message sizes in gRPC clients as well
    
    * fix typo and use 34 rc1
---
 Cargo.toml                                         | 16 +++++-----
 ballista/client/src/context.rs                     |  2 --
 ballista/core/src/cache_layer/object_store/file.rs | 35 ++++++++++++++++++----
 ballista/core/src/cache_layer/object_store/mod.rs  | 24 +++++++++++----
 ballista/core/src/cache_layer/policy/file.rs       |  1 +
 ballista/core/src/utils.rs                         |  2 +-
 ballista/scheduler/src/cluster/mod.rs              |  1 +
 ballista/scheduler/src/scheduler_server/grpc.rs    |  6 ----
 benchmarks/src/bin/tpch.rs                         |  3 +-
 9 files changed, 60 insertions(+), 30 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index fcdebab6..57752844 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,16 +29,16 @@ members = [
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "48.0.0", features=["ipc_compression"] }
-arrow-flight = { version = "48.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "48.0.0", default-features = false }
+arrow = { version = "49.0.0", features=["ipc_compression"] }
+arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "49.0.0", default-features = false }
 configure_me = { version = "0.4.0" }
 configure_me_codegen = { version = "0.4.4" }
-datafusion = "33.0.0"
-datafusion-cli = "33.0.0"
-datafusion-proto = "33.0.0"
-object_store = "0.7.0"
-sqlparser = "0.39.0"
+datafusion = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"34.0.0-rc1" }
+datafusion-cli = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"34.0.0-rc1" }
+datafusion-proto = { git = "https://github.com/apache/arrow-datafusion";, rev = 
"34.0.0-rc1" }
+object_store = "0.8.0"
+sqlparser = "0.40.0"
 tonic = { version = "0.10" }
 tonic-build = { version = "0.10", default-features = false, features = [
     "transport",
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index ea381559..a1c0d7e3 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -616,8 +616,6 @@ mod tests {
                         target_partitions: x.target_partitions,
                         file_sort_order: vec![],
                         infinite_source: false,
-                        insert_mode:
-                            
datafusion::datasource::listing::ListingTableInsertMode::Error,
                         file_type_write_options: None,
                         single_file: false,
                     };
diff --git a/ballista/core/src/cache_layer/object_store/file.rs 
b/ballista/core/src/cache_layer/object_store/file.rs
index d52ca954..169d2b5c 100644
--- a/ballista/core/src/cache_layer/object_store/file.rs
+++ b/ballista/core/src/cache_layer/object_store/file.rs
@@ -22,11 +22,12 @@ use crate::error::BallistaError;
 use async_trait::async_trait;
 use ballista_cache::loading_cache::LoadingCache;
 use bytes::Bytes;
-use futures::stream::BoxStream;
+use futures::stream::{self, BoxStream, StreamExt};
 use log::info;
 use object_store::path::Path;
 use object_store::{
     Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, 
ObjectStore,
+    PutOptions, PutResult,
 };
 use std::fmt::{Debug, Display, Formatter};
 use std::ops::Range;
@@ -73,7 +74,24 @@ impl<M> ObjectStore for FileCacheObjectStore<M>
 where
     M: CacheMedium,
 {
-    async fn put(&self, _location: &Path, _bytes: Bytes) -> 
object_store::Result<()> {
+    async fn put(
+        &self,
+        _location: &Path,
+        _bytes: Bytes,
+    ) -> object_store::Result<PutResult> {
+        Err(Error::NotSupported {
+            source: Box::new(BallistaError::General(
+                "Write path is not supported".to_string(),
+            )),
+        })
+    }
+
+    async fn put_opts(
+        &self,
+        _location: &Path,
+        _bytes: Bytes,
+        _opts: PutOptions,
+    ) -> object_store::Result<PutResult> {
         Err(Error::NotSupported {
             source: Box::new(BallistaError::General(
                 "Write path is not supported".to_string(),
@@ -209,13 +227,18 @@ where
         })
     }
 
-    async fn list(
+    fn list(
         &self,
         _prefix: Option<&Path>,
-    ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> 
{
-        Err(Error::NotSupported {
-            source: Box::new(BallistaError::General("List is not 
supported".to_string())),
+    ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
+        stream::once(async {
+            Err(Error::NotSupported {
+                source: Box::new(BallistaError::General(
+                    "List is not supported".to_string(),
+                )),
+            })
         })
+        .boxed()
     }
 
     async fn list_with_delimiter(
diff --git a/ballista/core/src/cache_layer/object_store/mod.rs 
b/ballista/core/src/cache_layer/object_store/mod.rs
index b90a741f..6d754eca 100644
--- a/ballista/core/src/cache_layer/object_store/mod.rs
+++ b/ballista/core/src/cache_layer/object_store/mod.rs
@@ -22,7 +22,8 @@ use bytes::Bytes;
 use futures::stream::BoxStream;
 use object_store::path::Path;
 use object_store::{
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutOptions,
+    PutResult,
 };
 use std::fmt::{Debug, Display, Formatter};
 use std::ops::Range;
@@ -60,10 +61,23 @@ impl Display for ObjectStoreWithKey {
 
 #[async_trait]
 impl ObjectStore for ObjectStoreWithKey {
-    async fn put(&self, location: &Path, bytes: Bytes) -> 
object_store::Result<()> {
+    async fn put(
+        &self,
+        location: &Path,
+        bytes: Bytes,
+    ) -> object_store::Result<PutResult> {
         self.inner.put(location, bytes).await
     }
 
+    async fn put_opts(
+        &self,
+        location: &Path,
+        bytes: Bytes,
+        opts: PutOptions,
+    ) -> object_store::Result<PutResult> {
+        self.inner.put_opts(location, bytes, opts).await
+    }
+
     async fn put_multipart(
         &self,
         location: &Path,
@@ -115,11 +129,11 @@ impl ObjectStore for ObjectStoreWithKey {
         self.inner.delete(location).await
     }
 
-    async fn list(
+    fn list(
         &self,
         prefix: Option<&Path>,
-    ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> 
{
-        self.inner.list(prefix).await
+    ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
+        self.inner.list(prefix)
     }
 
     async fn list_with_delimiter(
diff --git a/ballista/core/src/cache_layer/policy/file.rs 
b/ballista/core/src/cache_layer/policy/file.rs
index 98d76a11..dfc6c83f 100644
--- a/ballista/core/src/cache_layer/policy/file.rs
+++ b/ballista/core/src/cache_layer/policy/file.rs
@@ -202,6 +202,7 @@ where
         last_modified: source_meta.last_modified,
         size: cache_meta.size,
         e_tag: source_meta.e_tag,
+        version: None,
     })
 }
 
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index e16c1b4c..a6541c08 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -319,7 +319,7 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for 
BallistaQueryPlanner<T> {
         match logical_plan {
             LogicalPlan::Ddl(DdlStatement::CreateExternalTable(_)) => {
                 // table state is managed locally in the BallistaContext, not 
in the scheduler
-                Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
+                Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))))
             }
             _ => Ok(Arc::new(DistributedQueryExec::with_repr(
                 self.scheduler_url.clone(),
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index 793d3fc1..f7e1ca46 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -1092,6 +1092,7 @@ mod test {
                     last_modified: Default::default(),
                     size: 1,
                     e_tag: None,
+                    version: None,
                 },
                 partition_values: vec![],
                 range: None,
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index 8257891c..c1ff35de 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -308,12 +308,6 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
         let path = Path::from(path.as_str());
         let file_metas: Vec<_> = obj_store
             .list(Some(&path))
-            .await
-            .map_err(|e| {
-                let msg = format!("Error listing files: {e}");
-                error!("{}", msg);
-                tonic::Status::internal(msg)
-            })?
             .try_collect()
             .await
             .map_err(|e| {
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 189719c3..b2863a41 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -26,7 +26,7 @@ use ballista::prelude::{
 use datafusion::arrow::array::*;
 use datafusion::arrow::util::display::array_value_to_string;
 use datafusion::common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
-use datafusion::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
+use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::datasource::{MemTable, TableProvider};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionState;
@@ -845,7 +845,6 @@ async fn get_table(
         table_partition_cols: vec![],
         file_sort_order: vec![],
         infinite_source: false,
-        insert_mode: ListingTableInsertMode::Error,
         file_type_write_options: None,
         single_file: false,
     };

Reply via email to