This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fe8ab01b29 make datafusion-catalog-listing and move some 
implementation of listing out of datafusion/core/datasource/listing (#14464)
fe8ab01b29 is described below

commit fe8ab01b299c2431e61d055c9b44f24c1e8dc15c
Author: logan-keede <[email protected]>
AuthorDate: Wed Feb 5 19:29:57 2025 +0530

    make datafusion-catalog-listing and move some implementation of listing out 
of datafusion/core/datasource/listing (#14464)
    
    * make datafusion_catalog_listing
    
    * fix: this is a bit hacky
    
    * fixes: prettier, taplo etc
    
    * fixes: clippy
    
    * minor: permalink commit hash -> main
    
    * Tweak README
    
    * fix:prettier + wasm
    
    * prettier
    
    * Put unit tests with code
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 Cargo.toml                                         |   2 +
 datafusion-cli/Cargo.lock                          |  23 ++
 datafusion/catalog-listing/Cargo.toml              |  65 ++++++
 datafusion/catalog-listing/LICENSE.txt             |   1 +
 datafusion/catalog-listing/NOTICE.txt              |   1 +
 datafusion/catalog-listing/README.md               |  30 +++
 .../listing => catalog-listing/src}/helpers.rs     | 130 ++++++++---
 .../listing => catalog-listing/src}/mod.rs         |  12 +-
 .../listing => catalog-listing/src}/url.rs         |  13 +-
 datafusion/core/Cargo.toml                         |   1 +
 datafusion/core/src/datasource/file_format/mod.rs  |   2 +
 datafusion/core/src/datasource/listing/mod.rs      | 259 +--------------------
 datafusion/core/src/lib.rs                         |   3 +-
 datafusion/core/src/test/mod.rs                    |   4 +-
 14 files changed, 245 insertions(+), 301 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index c1f8a604dd..0952c17887 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@ members = [
     "datafusion/common",
     "datafusion/common-runtime",
     "datafusion/catalog",
+    "datafusion/catalog-listing",
     "datafusion/core",
     "datafusion/expr",
     "datafusion/expr-common",
@@ -100,6 +101,7 @@ ctor = "0.2.9"
 dashmap = "6.0.1"
 datafusion = { path = "datafusion/core", version = "45.0.0", default-features 
= false }
 datafusion-catalog = { path = "datafusion/catalog", version = "45.0.0" }
+datafusion-catalog-listing = { path = "datafusion/catalog-listing", version = 
"45.0.0" }
 datafusion-common = { path = "datafusion/common", version = "45.0.0", 
default-features = false }
 datafusion-common-runtime = { path = "datafusion/common-runtime", version = 
"45.0.0" }
 datafusion-doc = { path = "datafusion/doc", version = "45.0.0" }
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index bcbee29d6b..d1107d2a71 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1219,6 +1219,7 @@ dependencies = [
  "bzip2 0.5.0",
  "chrono",
  "datafusion-catalog",
+ "datafusion-catalog-listing",
  "datafusion-common",
  "datafusion-common-runtime",
  "datafusion-execution",
@@ -1274,6 +1275,28 @@ dependencies = [
  "sqlparser",
 ]
 
+[[package]]
+name = "datafusion-catalog-listing"
+version = "45.0.0"
+dependencies = [
+ "arrow",
+ "arrow-schema",
+ "chrono",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
+ "datafusion-physical-plan",
+ "futures",
+ "glob",
+ "itertools 0.14.0",
+ "log",
+ "object_store",
+ "url",
+]
+
 [[package]]
 name = "datafusion-cli"
 version = "45.0.0"
diff --git a/datafusion/catalog-listing/Cargo.toml 
b/datafusion/catalog-listing/Cargo.toml
new file mode 100644
index 0000000000..03132e7b7b
--- /dev/null
+++ b/datafusion/catalog-listing/Cargo.toml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-catalog-listing"
+description = "datafusion-catalog-listing"
+authors.workspace = true
+edition.workspace = true
+homepage.workspace = true
+license.workspace = true
+readme.workspace = true
+repository.workspace = true
+rust-version.workspace = true
+version.workspace = true
+
+[dependencies]
+arrow = { workspace = true }
+arrow-schema = { workspace = true }
+async-compression = { version = "0.4.0", features = [
+    "bzip2",
+    "gzip",
+    "xz",
+    "zstd",
+    "tokio",
+], optional = true }
+chrono = { workspace = true }
+datafusion-catalog = { workspace = true }
+datafusion-common = { workspace = true, features = ["object_store"] }
+datafusion-execution = { workspace = true }
+datafusion-expr = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-common = { workspace = true }
+datafusion-physical-plan = { workspace = true }
+futures = { workspace = true }
+glob = "0.3.0"
+itertools = { workspace = true }
+log = { workspace = true }
+object_store = { workspace = true }
+url = { workspace = true }
+
+[dev-dependencies]
+async-trait = { workspace = true }
+tempfile = { workspace = true }
+tokio = { workspace = true }
+
+[lints]
+workspace = true
+
+[lib]
+name = "datafusion_catalog_listing"
+path = "src/mod.rs"
diff --git a/datafusion/catalog-listing/LICENSE.txt 
b/datafusion/catalog-listing/LICENSE.txt
new file mode 120000
index 0000000000..1ef648f64b
--- /dev/null
+++ b/datafusion/catalog-listing/LICENSE.txt
@@ -0,0 +1 @@
+../../LICENSE.txt
\ No newline at end of file
diff --git a/datafusion/catalog-listing/NOTICE.txt 
b/datafusion/catalog-listing/NOTICE.txt
new file mode 120000
index 0000000000..fb051c92b1
--- /dev/null
+++ b/datafusion/catalog-listing/NOTICE.txt
@@ -0,0 +1 @@
+../../NOTICE.txt
\ No newline at end of file
diff --git a/datafusion/catalog-listing/README.md 
b/datafusion/catalog-listing/README.md
new file mode 100644
index 0000000000..b4760c413d
--- /dev/null
+++ b/datafusion/catalog-listing/README.md
@@ -0,0 +1,30 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# DataFusion catalog-listing
+
+[DataFusion][df] is an extensible query execution framework, written in Rust, 
that uses Apache Arrow as its in-memory format.
+
+This crate is a submodule of DataFusion with [ListingTable], an implementation
+of [TableProvider] based on files in a directory (either locally or on remote
+object storage such as S3).
+
+[df]: https://crates.io/crates/datafusion
+[listingtable]: 
https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
+[tableprovider]: 
https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/catalog-listing/src/helpers.rs
similarity index 91%
rename from datafusion/core/src/datasource/listing/helpers.rs
rename to datafusion/catalog-listing/src/helpers.rs
index 228b9a4e9f..6cb3f661e6 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/catalog-listing/src/helpers.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 
 use super::ListingTableUrl;
 use super::PartitionedFile;
-use crate::execution::context::SessionState;
+use datafusion_catalog::Session;
 use datafusion_common::internal_err;
 use datafusion_common::{HashMap, Result, ScalarValue};
 use datafusion_expr::{BinaryExpr, Operator};
@@ -154,7 +154,7 @@ pub fn split_files(
     chunks
 }
 
-struct Partition {
+pub struct Partition {
     /// The path to the partition, including the table prefix
     path: Path,
     /// How many path segments below the table prefix `path` contains
@@ -183,7 +183,7 @@ impl Partition {
 }
 
 /// Returns a recursive list of the partitions in `table_path` up to 
`max_depth`
-async fn list_partitions(
+pub async fn list_partitions(
     store: &dyn ObjectStore,
     table_path: &ListingTableUrl,
     max_depth: usize,
@@ -364,7 +364,7 @@ fn populate_partition_values<'a>(
     }
 }
 
-fn evaluate_partition_prefix<'a>(
+pub fn evaluate_partition_prefix<'a>(
     partition_cols: &'a [(String, DataType)],
     filters: &'a [Expr],
 ) -> Option<Path> {
@@ -405,7 +405,7 @@ fn evaluate_partition_prefix<'a>(
 /// `filters` should only contain expressions that can be evaluated
 /// using only the partition columns.
 pub async fn pruned_partition_list<'a>(
-    ctx: &'a SessionState,
+    ctx: &'a dyn Session,
     store: &'a dyn ObjectStore,
     table_path: &'a ListingTableUrl,
     filters: &'a [Expr],
@@ -489,7 +489,7 @@ pub async fn pruned_partition_list<'a>(
 
 /// Extract the partition values for the given `file_path` (in the given 
`table_path`)
 /// associated to the partitions defined by `table_partition_cols`
-fn parse_partitions_for_path<'a, I>(
+pub fn parse_partitions_for_path<'a, I>(
     table_path: &ListingTableUrl,
     file_path: &'a Path,
     table_partition_cols: I,
@@ -517,17 +517,36 @@ where
     }
     Some(part_values)
 }
+/// Describe a partition as a (path, depth, files) tuple for easier assertions
+pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
+    (
+        partition.path.as_ref(),
+        partition.depth,
+        partition
+            .files
+            .as_ref()
+            .map(|f| f.iter().map(|f| 
f.location.filename().unwrap()).collect())
+            .unwrap_or_default(),
+    )
+}
 
 #[cfg(test)]
 mod tests {
+    use async_trait::async_trait;
+    use datafusion_execution::config::SessionConfig;
+    use datafusion_execution::runtime_env::RuntimeEnv;
+    use futures::FutureExt;
+    use object_store::memory::InMemory;
+    use std::any::Any;
     use std::ops::Not;
-
-    use futures::StreamExt;
-
-    use crate::test::object_store::make_test_store_and_state;
-    use datafusion_expr::{case, col, lit, Expr};
+    // use futures::StreamExt;
 
     use super::*;
+    use datafusion_expr::{
+        case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF,
+    };
+    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+    use datafusion_physical_plan::ExecutionPlan;
 
     #[test]
     fn test_split_files() {
@@ -578,7 +597,7 @@ mod tests {
         ]);
         let filter = Expr::eq(col("mypartition"), lit("val1"));
         let pruned = pruned_partition_list(
-            &state,
+            state.as_ref(),
             store.as_ref(),
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter],
@@ -603,7 +622,7 @@ mod tests {
         ]);
         let filter = Expr::eq(col("mypartition"), lit("val1"));
         let pruned = pruned_partition_list(
-            &state,
+            state.as_ref(),
             store.as_ref(),
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter],
@@ -643,7 +662,7 @@ mod tests {
         let filter1 = Expr::eq(col("part1"), lit("p1v2"));
         let filter2 = Expr::eq(col("part2"), lit("p2v1"));
         let pruned = pruned_partition_list(
-            &state,
+            state.as_ref(),
             store.as_ref(),
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter1, filter2],
@@ -680,19 +699,6 @@ mod tests {
         );
     }
 
-    /// Describe a partition as a (path, depth, files) tuple for easier 
assertions
-    fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
-        (
-            partition.path.as_ref(),
-            partition.depth,
-            partition
-                .files
-                .as_ref()
-                .map(|f| f.iter().map(|f| 
f.location.filename().unwrap()).collect())
-                .unwrap_or_default(),
-        )
-    }
-
     #[tokio::test]
     async fn test_list_partition() {
         let (store, _) = make_test_store_and_state(&[
@@ -994,4 +1000,74 @@ mod tests {
             Some(Path::from("a=1970-01-05")),
         );
     }
+
+    pub fn make_test_store_and_state(
+        files: &[(&str, u64)],
+    ) -> (Arc<InMemory>, Arc<dyn Session>) {
+        let memory = InMemory::new();
+
+        for (name, size) in files {
+            memory
+                .put(&Path::from(*name), vec![0; *size as usize].into())
+                .now_or_never()
+                .unwrap()
+                .unwrap();
+        }
+
+        (Arc::new(memory), Arc::new(MockSession {}))
+    }
+
+    struct MockSession {}
+
+    #[async_trait]
+    impl Session for MockSession {
+        fn session_id(&self) -> &str {
+            unimplemented!()
+        }
+
+        fn config(&self) -> &SessionConfig {
+            unimplemented!()
+        }
+
+        async fn create_physical_plan(
+            &self,
+            _logical_plan: &LogicalPlan,
+        ) -> Result<Arc<dyn ExecutionPlan>> {
+            unimplemented!()
+        }
+
+        fn create_physical_expr(
+            &self,
+            _expr: Expr,
+            _df_schema: &DFSchema,
+        ) -> Result<Arc<dyn PhysicalExpr>> {
+            unimplemented!()
+        }
+
+        fn scalar_functions(&self) -> &std::collections::HashMap<String, 
Arc<ScalarUDF>> {
+            unimplemented!()
+        }
+
+        fn aggregate_functions(
+            &self,
+        ) -> &std::collections::HashMap<String, Arc<AggregateUDF>> {
+            unimplemented!()
+        }
+
+        fn window_functions(&self) -> &std::collections::HashMap<String, 
Arc<WindowUDF>> {
+            unimplemented!()
+        }
+
+        fn runtime_env(&self) -> &Arc<RuntimeEnv> {
+            unimplemented!()
+        }
+
+        fn execution_props(&self) -> &ExecutionProps {
+            unimplemented!()
+        }
+
+        fn as_any(&self) -> &dyn Any {
+            unimplemented!()
+        }
+    }
 }
diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/catalog-listing/src/mod.rs
similarity index 95%
copy from datafusion/core/src/datasource/listing/mod.rs
copy to datafusion/catalog-listing/src/mod.rs
index f11653ce1e..e952e39fd4 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/catalog-listing/src/mod.rs
@@ -18,9 +18,8 @@
 //! A table that uses the `ObjectStore` listing capability
 //! to get the list of files to process.
 
-mod helpers;
-mod table;
-mod url;
+pub mod helpers;
+pub mod url;
 
 use chrono::TimeZone;
 use datafusion_common::Result;
@@ -31,7 +30,6 @@ use std::pin::Pin;
 use std::sync::Arc;
 
 pub use self::url::ListingTableUrl;
-pub use table::{ListingOptions, ListingTable, ListingTableConfig};
 
 /// Stream of files get listed from object store
 pub type PartitionedFileStream =
@@ -68,9 +66,9 @@ pub struct PartitionedFile {
     /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have 
used [`wrap_partition_type_in_dict`] to wrap the column type.
     ///
     ///
-    /// [`wrap_partition_type_in_dict`]: 
crate::datasource::physical_plan::wrap_partition_type_in_dict
-    /// [`wrap_partition_value_in_dict`]: 
crate::datasource::physical_plan::wrap_partition_value_in_dict
-    /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
+    /// [`wrap_partition_type_in_dict`]: 
https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55
+    /// [`wrap_partition_value_in_dict`]: 
https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62
+    /// [`table_partition_cols`]: 
https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190
     pub partition_values: Vec<ScalarValue>,
     /// An optional file range for a more fine-grained parallel execution
     pub range: Option<FileRange>,
diff --git a/datafusion/core/src/datasource/listing/url.rs 
b/datafusion/catalog-listing/src/url.rs
similarity index 98%
rename from datafusion/core/src/datasource/listing/url.rs
rename to datafusion/catalog-listing/src/url.rs
index 6fb536ca2f..2e6415ba3b 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/catalog-listing/src/url.rs
@@ -15,10 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::execution::context::SessionState;
+use datafusion_catalog::Session;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_execution::object_store::ObjectStoreUrl;
-use datafusion_optimizer::OptimizerConfig;
 use futures::stream::BoxStream;
 use futures::{StreamExt, TryStreamExt};
 use glob::Pattern;
@@ -194,7 +193,7 @@ impl ListingTableUrl {
     ///
     /// Examples:
     /// ```rust
-    /// use datafusion::datasource::listing::ListingTableUrl;
+    /// use datafusion_catalog_listing::ListingTableUrl;
     /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
     /// assert_eq!(url.file_extension(), Some("csv"));
     /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
@@ -216,7 +215,7 @@ impl ListingTableUrl {
 
     /// Strips the prefix of this [`ListingTableUrl`] from the provided path, 
returning
     /// an iterator of the remaining path segments
-    pub(crate) fn strip_prefix<'a, 'b: 'a>(
+    pub fn strip_prefix<'a, 'b: 'a>(
         &'a self,
         path: &'b Path,
     ) -> Option<impl Iterator<Item = &'b str> + 'a> {
@@ -230,11 +229,11 @@ impl ListingTableUrl {
     /// List all files identified by this [`ListingTableUrl`] for the provided 
`file_extension`
     pub async fn list_all_files<'a>(
         &'a self,
-        ctx: &'a SessionState,
+        ctx: &'a dyn Session,
         store: &'a dyn ObjectStore,
         file_extension: &'a str,
     ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
-        let exec_options = &ctx.options().execution;
+        let exec_options = &ctx.config_options().execution;
         let ignore_subdirectory = 
exec_options.listing_table_ignore_subdirectory;
         // If the prefix is a file, use a head request, otherwise list
         let list = match self.is_collection() {
@@ -325,6 +324,7 @@ impl std::fmt::Display for ListingTableUrl {
     }
 }
 
+#[cfg(not(target_arch = "wasm32"))]
 const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
 
 /// Splits `path` at the first path segment containing a glob expression, 
returning
@@ -333,6 +333,7 @@ const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
 /// Path delimiters are determined using [`std::path::is_separator`] which
 /// permits `/` as a path delimiter even on Windows platforms.
 ///
+#[cfg(not(target_arch = "wasm32"))]
 fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
     let mut last_separator = 0;
 
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index b708c18f5b..815191fd3c 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -100,6 +100,7 @@ bytes = { workspace = true }
 bzip2 = { version = "0.5.0", optional = true }
 chrono = { workspace = true }
 datafusion-catalog = { workspace = true }
+datafusion-catalog-listing = { workspace = true }
 datafusion-common = { workspace = true, features = ["object_store"] }
 datafusion-common-runtime = { workspace = true }
 datafusion-execution = { workspace = true }
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index f47e2107ad..2e2e6dba1c 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -425,6 +425,7 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema {
 }
 
 /// Coerces the file schema if the table schema uses a view type.
+#[cfg(not(target_arch = "wasm32"))]
 pub(crate) fn coerce_file_schema_to_view_type(
     table_schema: &Schema,
     file_schema: &Schema,
@@ -489,6 +490,7 @@ pub fn transform_binary_to_string(schema: &Schema) -> 
Schema {
 /// If the table schema uses a string type, coerce the file schema to use a 
string type.
 ///
 /// See [parquet::ParquetFormat::binary_as_string] for details
+#[cfg(not(target_arch = "wasm32"))]
 pub(crate) fn coerce_file_schema_to_string_type(
     table_schema: &Schema,
     file_schema: &Schema,
diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index f11653ce1e..39323b993d 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -18,263 +18,6 @@
 //! A table that uses the `ObjectStore` listing capability
 //! to get the list of files to process.
 
-mod helpers;
 mod table;
-mod url;
-
-use chrono::TimeZone;
-use datafusion_common::Result;
-use datafusion_common::{ScalarValue, Statistics};
-use futures::Stream;
-use object_store::{path::Path, ObjectMeta};
-use std::pin::Pin;
-use std::sync::Arc;
-
-pub use self::url::ListingTableUrl;
+pub use datafusion_catalog_listing::*;
 pub use table::{ListingOptions, ListingTable, ListingTableConfig};
-
-/// Stream of files get listed from object store
-pub type PartitionedFileStream =
-    Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 
'static>>;
-
-/// Only scan a subset of Row Groups from the Parquet file whose data 
"midpoint"
-/// lies within the [start, end) byte offsets. This option can be used to scan 
non-overlapping
-/// sections of a Parquet file in parallel.
-#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
-pub struct FileRange {
-    /// Range start
-    pub start: i64,
-    /// Range end
-    pub end: i64,
-}
-
-impl FileRange {
-    /// returns true if this file range contains the specified offset
-    pub fn contains(&self, offset: i64) -> bool {
-        offset >= self.start && offset < self.end
-    }
-}
-
-#[derive(Debug, Clone)]
-/// A single file or part of a file that should be read, along with its 
schema, statistics
-/// and partition column values that need to be appended to each row.
-pub struct PartitionedFile {
-    /// Path for the file (e.g. URL, filesystem path, etc)
-    pub object_meta: ObjectMeta,
-    /// Values of partition columns to be appended to each row.
-    ///
-    /// These MUST have the same count, order, and type than the 
[`table_partition_cols`].
-    ///
-    /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have 
used [`wrap_partition_type_in_dict`] to wrap the column type.
-    ///
-    ///
-    /// [`wrap_partition_type_in_dict`]: 
crate::datasource::physical_plan::wrap_partition_type_in_dict
-    /// [`wrap_partition_value_in_dict`]: 
crate::datasource::physical_plan::wrap_partition_value_in_dict
-    /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
-    pub partition_values: Vec<ScalarValue>,
-    /// An optional file range for a more fine-grained parallel execution
-    pub range: Option<FileRange>,
-    /// Optional statistics that describe the data in this file if known.
-    ///
-    /// DataFusion relies on these statistics for planning (in particular to 
sort file groups),
-    /// so if they are incorrect, incorrect answers may result.
-    pub statistics: Option<Statistics>,
-    /// An optional field for user defined per object metadata
-    pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
-    /// The estimated size of the parquet metadata, in bytes
-    pub metadata_size_hint: Option<usize>,
-}
-
-impl PartitionedFile {
-    /// Create a simple file without metadata or partition
-    pub fn new(path: impl Into<String>, size: u64) -> Self {
-        Self {
-            object_meta: ObjectMeta {
-                location: Path::from(path.into()),
-                last_modified: chrono::Utc.timestamp_nanos(0),
-                size: size as usize,
-                e_tag: None,
-                version: None,
-            },
-            partition_values: vec![],
-            range: None,
-            statistics: None,
-            extensions: None,
-            metadata_size_hint: None,
-        }
-    }
-
-    /// Create a file range without metadata or partition
-    pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> 
Self {
-        Self {
-            object_meta: ObjectMeta {
-                location: Path::from(path),
-                last_modified: chrono::Utc.timestamp_nanos(0),
-                size: size as usize,
-                e_tag: None,
-                version: None,
-            },
-            partition_values: vec![],
-            range: Some(FileRange { start, end }),
-            statistics: None,
-            extensions: None,
-            metadata_size_hint: None,
-        }
-        .with_range(start, end)
-    }
-
-    /// Provide a hint to the size of the file metadata. If a hint is provided
-    /// the reader will try and fetch the last `size_hint` bytes of the 
parquet file optimistically.
-    /// Without an appropriate hint, two read may be required to fetch the 
metadata.
-    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> 
Self {
-        self.metadata_size_hint = Some(metadata_size_hint);
-        self
-    }
-
-    /// Return a file reference from the given path
-    pub fn from_path(path: String) -> Result<Self> {
-        let size = std::fs::metadata(path.clone())?.len();
-        Ok(Self::new(path, size))
-    }
-
-    /// Return the path of this partitioned file
-    pub fn path(&self) -> &Path {
-        &self.object_meta.location
-    }
-
-    /// Update the file to only scan the specified range (in bytes)
-    pub fn with_range(mut self, start: i64, end: i64) -> Self {
-        self.range = Some(FileRange { start, end });
-        self
-    }
-
-    /// Update the user defined extensions for this file.
-    ///
-    /// This can be used to pass reader specific information.
-    pub fn with_extensions(
-        mut self,
-        extensions: Arc<dyn std::any::Any + Send + Sync>,
-    ) -> Self {
-        self.extensions = Some(extensions);
-        self
-    }
-}
-
-impl From<ObjectMeta> for PartitionedFile {
-    fn from(object_meta: ObjectMeta) -> Self {
-        PartitionedFile {
-            object_meta,
-            partition_values: vec![],
-            range: None,
-            statistics: None,
-            extensions: None,
-            metadata_size_hint: None,
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::ListingTableUrl;
-    use datafusion_execution::object_store::{
-        DefaultObjectStoreRegistry, ObjectStoreRegistry,
-    };
-    use object_store::{local::LocalFileSystem, path::Path};
-    use std::{ops::Not, sync::Arc};
-    use url::Url;
-
-    #[test]
-    fn test_object_store_listing_url() {
-        let listing = ListingTableUrl::parse("file:///").unwrap();
-        let store = listing.object_store();
-        assert_eq!(store.as_str(), "file:///");
-
-        let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
-        let store = listing.object_store();
-        assert_eq!(store.as_str(), "s3://bucket/");
-    }
-
-    #[test]
-    fn test_get_store_hdfs() {
-        let sut = DefaultObjectStoreRegistry::default();
-        let url = Url::parse("hdfs://localhost:8020").unwrap();
-        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
-        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
-        sut.get_store(url.as_ref()).unwrap();
-    }
-
-    #[test]
-    fn test_get_store_s3() {
-        let sut = DefaultObjectStoreRegistry::default();
-        let url = Url::parse("s3://bucket/key").unwrap();
-        sut.register_store(&url, Arc::new(LocalFileSystem::new()));
-        let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
-        sut.get_store(url.as_ref()).unwrap();
-    }
-
-    #[test]
-    fn test_get_store_file() {
-        let sut = DefaultObjectStoreRegistry::default();
-        let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
-        sut.get_store(url.as_ref()).unwrap();
-    }
-
-    #[test]
-    fn test_get_store_local() {
-        let sut = DefaultObjectStoreRegistry::default();
-        let url = ListingTableUrl::parse("../").unwrap();
-        sut.get_store(url.as_ref()).unwrap();
-    }
-
-    #[test]
-    fn test_url_contains() {
-        let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
-
-        // standard case with default config
-        assert!(url.contains(
-            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
-            true
-        ));
-
-        // standard case with `ignore_subdirectory` set to false
-        assert!(url.contains(
-            &Path::parse("/var/data/mytable/data.parquet").unwrap(),
-            false
-        ));
-
-        // as per documentation, when `ignore_subdirectory` is true, we should 
ignore files that aren't
-        // a direct child of the `url`
-        assert!(url
-            .contains(
-                
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
-                true
-            )
-            .not());
-
-        // when we set `ignore_subdirectory` to false, we should not ignore 
the file
-        assert!(url.contains(
-            
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
-            false
-        ));
-
-        // as above, `ignore_subdirectory` is false, so we include the file
-        assert!(url.contains(
-            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
-            false
-        ));
-
-        // in this case, we include the file even when `ignore_subdirectory` 
is true because the
-        // path segment is a hive partition which doesn't count as a 
subdirectory for the purposes
-        // of `Url::contains`
-        assert!(url.contains(
-            &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
-            true
-        ));
-
-        // testing an empty path with default config
-        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), 
true));
-
-        // testing an empty path with `ignore_subdirectory` set to false
-        assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), 
false));
-    }
-}
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 780b229833..ca0aa92ff1 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -809,8 +809,9 @@ pub mod variable {
     pub use datafusion_expr::var_provider::{VarProvider, VarType};
 }
 
-#[cfg(test)]
+#[cfg(not(target_arch = "wasm32"))]
 pub mod test;
+
 pub mod test_util;
 
 #[cfg(doctest)]
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 0d659582ac..05e63a3c4f 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -61,7 +61,7 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> {
         Field::new("name", DataType::Utf8, false),
     ]));
     let batch = RecordBatch::try_new(
-        dual_schema.clone(),
+        Arc::<Schema>::clone(&dual_schema),
         vec![
             Arc::new(Int32Array::from(vec![1])),
             Arc::new(array::StringArray::from(vec!["a"])),
@@ -244,7 +244,7 @@ pub fn table_with_sequence(
     let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, 
true)]));
     let arr = 
Arc::new(Int32Array::from((seq_start..=seq_end).collect::<Vec<_>>()));
     let partitions = vec![vec![RecordBatch::try_new(
-        schema.clone(),
+        Arc::<Schema>::clone(&schema),
         vec![arr as ArrayRef],
     )?]];
     Ok(Arc::new(MemTable::try_new(schema, partitions)?))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to