This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fe8ab01b29 make datafusion-catalog-listing and move some
implementation of listing out of datafusion/core/datasource/listing (#14464)
fe8ab01b29 is described below
commit fe8ab01b299c2431e61d055c9b44f24c1e8dc15c
Author: logan-keede <[email protected]>
AuthorDate: Wed Feb 5 19:29:57 2025 +0530
make datafusion-catalog-listing and move some implementation of listing out
of datafusion/core/datasource/listing (#14464)
* make datafusion_catalog_listing
* fix: this is a bit hacky
* fixes: prettier, taplo etc
* fixes: clippy
* minor: permalink commit hash -> main
* Tweak README
* fix:prettier + wasm
* prettier
* Put unit tests with code
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
Cargo.toml | 2 +
datafusion-cli/Cargo.lock | 23 ++
datafusion/catalog-listing/Cargo.toml | 65 ++++++
datafusion/catalog-listing/LICENSE.txt | 1 +
datafusion/catalog-listing/NOTICE.txt | 1 +
datafusion/catalog-listing/README.md | 30 +++
.../listing => catalog-listing/src}/helpers.rs | 130 ++++++++---
.../listing => catalog-listing/src}/mod.rs | 12 +-
.../listing => catalog-listing/src}/url.rs | 13 +-
datafusion/core/Cargo.toml | 1 +
datafusion/core/src/datasource/file_format/mod.rs | 2 +
datafusion/core/src/datasource/listing/mod.rs | 259 +--------------------
datafusion/core/src/lib.rs | 3 +-
datafusion/core/src/test/mod.rs | 4 +-
14 files changed, 245 insertions(+), 301 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index c1f8a604dd..0952c17887 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@ members = [
"datafusion/common",
"datafusion/common-runtime",
"datafusion/catalog",
+ "datafusion/catalog-listing",
"datafusion/core",
"datafusion/expr",
"datafusion/expr-common",
@@ -100,6 +101,7 @@ ctor = "0.2.9"
dashmap = "6.0.1"
datafusion = { path = "datafusion/core", version = "45.0.0", default-features
= false }
datafusion-catalog = { path = "datafusion/catalog", version = "45.0.0" }
+datafusion-catalog-listing = { path = "datafusion/catalog-listing", version =
"45.0.0" }
datafusion-common = { path = "datafusion/common", version = "45.0.0",
default-features = false }
datafusion-common-runtime = { path = "datafusion/common-runtime", version =
"45.0.0" }
datafusion-doc = { path = "datafusion/doc", version = "45.0.0" }
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index bcbee29d6b..d1107d2a71 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1219,6 +1219,7 @@ dependencies = [
"bzip2 0.5.0",
"chrono",
"datafusion-catalog",
+ "datafusion-catalog-listing",
"datafusion-common",
"datafusion-common-runtime",
"datafusion-execution",
@@ -1274,6 +1275,28 @@ dependencies = [
"sqlparser",
]
+[[package]]
+name = "datafusion-catalog-listing"
+version = "45.0.0"
+dependencies = [
+ "arrow",
+ "arrow-schema",
+ "chrono",
+ "datafusion-catalog",
+ "datafusion-common",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-physical-expr",
+ "datafusion-physical-expr-common",
+ "datafusion-physical-plan",
+ "futures",
+ "glob",
+ "itertools 0.14.0",
+ "log",
+ "object_store",
+ "url",
+]
+
[[package]]
name = "datafusion-cli"
version = "45.0.0"
diff --git a/datafusion/catalog-listing/Cargo.toml
b/datafusion/catalog-listing/Cargo.toml
new file mode 100644
index 0000000000..03132e7b7b
--- /dev/null
+++ b/datafusion/catalog-listing/Cargo.toml
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-catalog-listing"
+description = "datafusion-catalog-listing"
+authors.workspace = true
+edition.workspace = true
+homepage.workspace = true
+license.workspace = true
+readme.workspace = true
+repository.workspace = true
+rust-version.workspace = true
+version.workspace = true
+
+[dependencies]
+arrow = { workspace = true }
+arrow-schema = { workspace = true }
+async-compression = { version = "0.4.0", features = [
+ "bzip2",
+ "gzip",
+ "xz",
+ "zstd",
+ "tokio",
+], optional = true }
+chrono = { workspace = true }
+datafusion-catalog = { workspace = true }
+datafusion-common = { workspace = true, features = ["object_store"] }
+datafusion-execution = { workspace = true }
+datafusion-expr = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+datafusion-physical-expr-common = { workspace = true }
+datafusion-physical-plan = { workspace = true }
+futures = { workspace = true }
+glob = "0.3.0"
+itertools = { workspace = true }
+log = { workspace = true }
+object_store = { workspace = true }
+url = { workspace = true }
+
+[dev-dependencies]
+async-trait = { workspace = true }
+tempfile = { workspace = true }
+tokio = { workspace = true }
+
+[lints]
+workspace = true
+
+[lib]
+name = "datafusion_catalog_listing"
+path = "src/mod.rs"
diff --git a/datafusion/catalog-listing/LICENSE.txt
b/datafusion/catalog-listing/LICENSE.txt
new file mode 120000
index 0000000000..1ef648f64b
--- /dev/null
+++ b/datafusion/catalog-listing/LICENSE.txt
@@ -0,0 +1 @@
+../../LICENSE.txt
\ No newline at end of file
diff --git a/datafusion/catalog-listing/NOTICE.txt
b/datafusion/catalog-listing/NOTICE.txt
new file mode 120000
index 0000000000..fb051c92b1
--- /dev/null
+++ b/datafusion/catalog-listing/NOTICE.txt
@@ -0,0 +1 @@
+../../NOTICE.txt
\ No newline at end of file
diff --git a/datafusion/catalog-listing/README.md
b/datafusion/catalog-listing/README.md
new file mode 100644
index 0000000000..b4760c413d
--- /dev/null
+++ b/datafusion/catalog-listing/README.md
@@ -0,0 +1,30 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# DataFusion catalog-listing
+
+[DataFusion][df] is an extensible query execution framework, written in Rust,
that uses Apache Arrow as its in-memory format.
+
+This crate is a submodule of DataFusion with [ListingTable], an implementation
+of [TableProvider] based on files in a directory (either locally or on remote
+object storage such as S3).
+
+[df]: https://crates.io/crates/datafusion
+[listingtable]:
https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
+[tableprovider]:
https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/catalog-listing/src/helpers.rs
similarity index 91%
rename from datafusion/core/src/datasource/listing/helpers.rs
rename to datafusion/catalog-listing/src/helpers.rs
index 228b9a4e9f..6cb3f661e6 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/catalog-listing/src/helpers.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
use super::ListingTableUrl;
use super::PartitionedFile;
-use crate::execution::context::SessionState;
+use datafusion_catalog::Session;
use datafusion_common::internal_err;
use datafusion_common::{HashMap, Result, ScalarValue};
use datafusion_expr::{BinaryExpr, Operator};
@@ -154,7 +154,7 @@ pub fn split_files(
chunks
}
-struct Partition {
+pub struct Partition {
/// The path to the partition, including the table prefix
path: Path,
/// How many path segments below the table prefix `path` contains
@@ -183,7 +183,7 @@ impl Partition {
}
/// Returns a recursive list of the partitions in `table_path` up to
`max_depth`
-async fn list_partitions(
+pub async fn list_partitions(
store: &dyn ObjectStore,
table_path: &ListingTableUrl,
max_depth: usize,
@@ -364,7 +364,7 @@ fn populate_partition_values<'a>(
}
}
-fn evaluate_partition_prefix<'a>(
+pub fn evaluate_partition_prefix<'a>(
partition_cols: &'a [(String, DataType)],
filters: &'a [Expr],
) -> Option<Path> {
@@ -405,7 +405,7 @@ fn evaluate_partition_prefix<'a>(
/// `filters` should only contain expressions that can be evaluated
/// using only the partition columns.
pub async fn pruned_partition_list<'a>(
- ctx: &'a SessionState,
+ ctx: &'a dyn Session,
store: &'a dyn ObjectStore,
table_path: &'a ListingTableUrl,
filters: &'a [Expr],
@@ -489,7 +489,7 @@ pub async fn pruned_partition_list<'a>(
/// Extract the partition values for the given `file_path` (in the given
`table_path`)
/// associated to the partitions defined by `table_partition_cols`
-fn parse_partitions_for_path<'a, I>(
+pub fn parse_partitions_for_path<'a, I>(
table_path: &ListingTableUrl,
file_path: &'a Path,
table_partition_cols: I,
@@ -517,17 +517,36 @@ where
}
Some(part_values)
}
+/// Describe a partition as a (path, depth, files) tuple for easier assertions
+pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
+ (
+ partition.path.as_ref(),
+ partition.depth,
+ partition
+ .files
+ .as_ref()
+ .map(|f| f.iter().map(|f|
f.location.filename().unwrap()).collect())
+ .unwrap_or_default(),
+ )
+}
#[cfg(test)]
mod tests {
+ use async_trait::async_trait;
+ use datafusion_execution::config::SessionConfig;
+ use datafusion_execution::runtime_env::RuntimeEnv;
+ use futures::FutureExt;
+ use object_store::memory::InMemory;
+ use std::any::Any;
use std::ops::Not;
-
- use futures::StreamExt;
-
- use crate::test::object_store::make_test_store_and_state;
- use datafusion_expr::{case, col, lit, Expr};
+ // use futures::StreamExt;
use super::*;
+ use datafusion_expr::{
+ case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF,
+ };
+ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
+ use datafusion_physical_plan::ExecutionPlan;
#[test]
fn test_split_files() {
@@ -578,7 +597,7 @@ mod tests {
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
- &state,
+ state.as_ref(),
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
@@ -603,7 +622,7 @@ mod tests {
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
- &state,
+ state.as_ref(),
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
@@ -643,7 +662,7 @@ mod tests {
let filter1 = Expr::eq(col("part1"), lit("p1v2"));
let filter2 = Expr::eq(col("part2"), lit("p2v1"));
let pruned = pruned_partition_list(
- &state,
+ state.as_ref(),
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2],
@@ -680,19 +699,6 @@ mod tests {
);
}
- /// Describe a partition as a (path, depth, files) tuple for easier
assertions
- fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
- (
- partition.path.as_ref(),
- partition.depth,
- partition
- .files
- .as_ref()
- .map(|f| f.iter().map(|f|
f.location.filename().unwrap()).collect())
- .unwrap_or_default(),
- )
- }
-
#[tokio::test]
async fn test_list_partition() {
let (store, _) = make_test_store_and_state(&[
@@ -994,4 +1000,74 @@ mod tests {
Some(Path::from("a=1970-01-05")),
);
}
+
+ pub fn make_test_store_and_state(
+ files: &[(&str, u64)],
+ ) -> (Arc<InMemory>, Arc<dyn Session>) {
+ let memory = InMemory::new();
+
+ for (name, size) in files {
+ memory
+ .put(&Path::from(*name), vec![0; *size as usize].into())
+ .now_or_never()
+ .unwrap()
+ .unwrap();
+ }
+
+ (Arc::new(memory), Arc::new(MockSession {}))
+ }
+
+ struct MockSession {}
+
+ #[async_trait]
+ impl Session for MockSession {
+ fn session_id(&self) -> &str {
+ unimplemented!()
+ }
+
+ fn config(&self) -> &SessionConfig {
+ unimplemented!()
+ }
+
+ async fn create_physical_plan(
+ &self,
+ _logical_plan: &LogicalPlan,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ unimplemented!()
+ }
+
+ fn create_physical_expr(
+ &self,
+ _expr: Expr,
+ _df_schema: &DFSchema,
+ ) -> Result<Arc<dyn PhysicalExpr>> {
+ unimplemented!()
+ }
+
+ fn scalar_functions(&self) -> &std::collections::HashMap<String,
Arc<ScalarUDF>> {
+ unimplemented!()
+ }
+
+ fn aggregate_functions(
+ &self,
+ ) -> &std::collections::HashMap<String, Arc<AggregateUDF>> {
+ unimplemented!()
+ }
+
+ fn window_functions(&self) -> &std::collections::HashMap<String,
Arc<WindowUDF>> {
+ unimplemented!()
+ }
+
+ fn runtime_env(&self) -> &Arc<RuntimeEnv> {
+ unimplemented!()
+ }
+
+ fn execution_props(&self) -> &ExecutionProps {
+ unimplemented!()
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ unimplemented!()
+ }
+ }
}
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/catalog-listing/src/mod.rs
similarity index 95%
copy from datafusion/core/src/datasource/listing/mod.rs
copy to datafusion/catalog-listing/src/mod.rs
index f11653ce1e..e952e39fd4 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/catalog-listing/src/mod.rs
@@ -18,9 +18,8 @@
//! A table that uses the `ObjectStore` listing capability
//! to get the list of files to process.
-mod helpers;
-mod table;
-mod url;
+pub mod helpers;
+pub mod url;
use chrono::TimeZone;
use datafusion_common::Result;
@@ -31,7 +30,6 @@ use std::pin::Pin;
use std::sync::Arc;
pub use self::url::ListingTableUrl;
-pub use table::{ListingOptions, ListingTable, ListingTableConfig};
/// Stream of files get listed from object store
pub type PartitionedFileStream =
@@ -68,9 +66,9 @@ pub struct PartitionedFile {
/// You may use [`wrap_partition_value_in_dict`] to wrap them if you have
used [`wrap_partition_type_in_dict`] to wrap the column type.
///
///
- /// [`wrap_partition_type_in_dict`]:
crate::datasource::physical_plan::wrap_partition_type_in_dict
- /// [`wrap_partition_value_in_dict`]:
crate::datasource::physical_plan::wrap_partition_value_in_dict
- /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
+ /// [`wrap_partition_type_in_dict`]:
https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55
+ /// [`wrap_partition_value_in_dict`]:
https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62
+ /// [`table_partition_cols`]:
https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
diff --git a/datafusion/core/src/datasource/listing/url.rs
b/datafusion/catalog-listing/src/url.rs
similarity index 98%
rename from datafusion/core/src/datasource/listing/url.rs
rename to datafusion/catalog-listing/src/url.rs
index 6fb536ca2f..2e6415ba3b 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/catalog-listing/src/url.rs
@@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use crate::execution::context::SessionState;
+use datafusion_catalog::Session;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::object_store::ObjectStoreUrl;
-use datafusion_optimizer::OptimizerConfig;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
@@ -194,7 +193,7 @@ impl ListingTableUrl {
///
/// Examples:
/// ```rust
- /// use datafusion::datasource::listing::ListingTableUrl;
+ /// use datafusion_catalog_listing::ListingTableUrl;
/// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap();
/// assert_eq!(url.file_extension(), Some("csv"));
/// let url = ListingTableUrl::parse("file:///foo/bar").unwrap();
@@ -216,7 +215,7 @@ impl ListingTableUrl {
/// Strips the prefix of this [`ListingTableUrl`] from the provided path,
returning
/// an iterator of the remaining path segments
- pub(crate) fn strip_prefix<'a, 'b: 'a>(
+ pub fn strip_prefix<'a, 'b: 'a>(
&'a self,
path: &'b Path,
) -> Option<impl Iterator<Item = &'b str> + 'a> {
@@ -230,11 +229,11 @@ impl ListingTableUrl {
/// List all files identified by this [`ListingTableUrl`] for the provided
`file_extension`
pub async fn list_all_files<'a>(
&'a self,
- ctx: &'a SessionState,
+ ctx: &'a dyn Session,
store: &'a dyn ObjectStore,
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
- let exec_options = &ctx.options().execution;
+ let exec_options = &ctx.config_options().execution;
let ignore_subdirectory =
exec_options.listing_table_ignore_subdirectory;
// If the prefix is a file, use a head request, otherwise list
let list = match self.is_collection() {
@@ -325,6 +324,7 @@ impl std::fmt::Display for ListingTableUrl {
}
}
+#[cfg(not(target_arch = "wasm32"))]
const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
/// Splits `path` at the first path segment containing a glob expression,
returning
@@ -333,6 +333,7 @@ const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];
/// Path delimiters are determined using [`std::path::is_separator`] which
/// permits `/` as a path delimiter even on Windows platforms.
///
+#[cfg(not(target_arch = "wasm32"))]
fn split_glob_expression(path: &str) -> Option<(&str, &str)> {
let mut last_separator = 0;
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index b708c18f5b..815191fd3c 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -100,6 +100,7 @@ bytes = { workspace = true }
bzip2 = { version = "0.5.0", optional = true }
chrono = { workspace = true }
datafusion-catalog = { workspace = true }
+datafusion-catalog-listing = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store"] }
datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index f47e2107ad..2e2e6dba1c 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -425,6 +425,7 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema {
}
/// Coerces the file schema if the table schema uses a view type.
+#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn coerce_file_schema_to_view_type(
table_schema: &Schema,
file_schema: &Schema,
@@ -489,6 +490,7 @@ pub fn transform_binary_to_string(schema: &Schema) ->
Schema {
/// If the table schema uses a string type, coerce the file schema to use a
string type.
///
/// See [parquet::ParquetFormat::binary_as_string] for details
+#[cfg(not(target_arch = "wasm32"))]
pub(crate) fn coerce_file_schema_to_string_type(
table_schema: &Schema,
file_schema: &Schema,
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index f11653ce1e..39323b993d 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -18,263 +18,6 @@
//! A table that uses the `ObjectStore` listing capability
//! to get the list of files to process.
-mod helpers;
mod table;
-mod url;
-
-use chrono::TimeZone;
-use datafusion_common::Result;
-use datafusion_common::{ScalarValue, Statistics};
-use futures::Stream;
-use object_store::{path::Path, ObjectMeta};
-use std::pin::Pin;
-use std::sync::Arc;
-
-pub use self::url::ListingTableUrl;
+pub use datafusion_catalog_listing::*;
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
-
-/// Stream of files get listed from object store
-pub type PartitionedFileStream =
- Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync +
'static>>;
-
-/// Only scan a subset of Row Groups from the Parquet file whose data
"midpoint"
-/// lies within the [start, end) byte offsets. This option can be used to scan
non-overlapping
-/// sections of a Parquet file in parallel.
-#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
-pub struct FileRange {
- /// Range start
- pub start: i64,
- /// Range end
- pub end: i64,
-}
-
-impl FileRange {
- /// returns true if this file range contains the specified offset
- pub fn contains(&self, offset: i64) -> bool {
- offset >= self.start && offset < self.end
- }
-}
-
-#[derive(Debug, Clone)]
-/// A single file or part of a file that should be read, along with its
schema, statistics
-/// and partition column values that need to be appended to each row.
-pub struct PartitionedFile {
- /// Path for the file (e.g. URL, filesystem path, etc)
- pub object_meta: ObjectMeta,
- /// Values of partition columns to be appended to each row.
- ///
- /// These MUST have the same count, order, and type than the
[`table_partition_cols`].
- ///
- /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have
used [`wrap_partition_type_in_dict`] to wrap the column type.
- ///
- ///
- /// [`wrap_partition_type_in_dict`]:
crate::datasource::physical_plan::wrap_partition_type_in_dict
- /// [`wrap_partition_value_in_dict`]:
crate::datasource::physical_plan::wrap_partition_value_in_dict
- /// [`table_partition_cols`]: table::ListingOptions::table_partition_cols
- pub partition_values: Vec<ScalarValue>,
- /// An optional file range for a more fine-grained parallel execution
- pub range: Option<FileRange>,
- /// Optional statistics that describe the data in this file if known.
- ///
- /// DataFusion relies on these statistics for planning (in particular to
sort file groups),
- /// so if they are incorrect, incorrect answers may result.
- pub statistics: Option<Statistics>,
- /// An optional field for user defined per object metadata
- pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
- /// The estimated size of the parquet metadata, in bytes
- pub metadata_size_hint: Option<usize>,
-}
-
-impl PartitionedFile {
- /// Create a simple file without metadata or partition
- pub fn new(path: impl Into<String>, size: u64) -> Self {
- Self {
- object_meta: ObjectMeta {
- location: Path::from(path.into()),
- last_modified: chrono::Utc.timestamp_nanos(0),
- size: size as usize,
- e_tag: None,
- version: None,
- },
- partition_values: vec![],
- range: None,
- statistics: None,
- extensions: None,
- metadata_size_hint: None,
- }
- }
-
- /// Create a file range without metadata or partition
- pub fn new_with_range(path: String, size: u64, start: i64, end: i64) ->
Self {
- Self {
- object_meta: ObjectMeta {
- location: Path::from(path),
- last_modified: chrono::Utc.timestamp_nanos(0),
- size: size as usize,
- e_tag: None,
- version: None,
- },
- partition_values: vec![],
- range: Some(FileRange { start, end }),
- statistics: None,
- extensions: None,
- metadata_size_hint: None,
- }
- .with_range(start, end)
- }
-
- /// Provide a hint to the size of the file metadata. If a hint is provided
- /// the reader will try and fetch the last `size_hint` bytes of the
parquet file optimistically.
- /// Without an appropriate hint, two read may be required to fetch the
metadata.
- pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) ->
Self {
- self.metadata_size_hint = Some(metadata_size_hint);
- self
- }
-
- /// Return a file reference from the given path
- pub fn from_path(path: String) -> Result<Self> {
- let size = std::fs::metadata(path.clone())?.len();
- Ok(Self::new(path, size))
- }
-
- /// Return the path of this partitioned file
- pub fn path(&self) -> &Path {
- &self.object_meta.location
- }
-
- /// Update the file to only scan the specified range (in bytes)
- pub fn with_range(mut self, start: i64, end: i64) -> Self {
- self.range = Some(FileRange { start, end });
- self
- }
-
- /// Update the user defined extensions for this file.
- ///
- /// This can be used to pass reader specific information.
- pub fn with_extensions(
- mut self,
- extensions: Arc<dyn std::any::Any + Send + Sync>,
- ) -> Self {
- self.extensions = Some(extensions);
- self
- }
-}
-
-impl From<ObjectMeta> for PartitionedFile {
- fn from(object_meta: ObjectMeta) -> Self {
- PartitionedFile {
- object_meta,
- partition_values: vec![],
- range: None,
- statistics: None,
- extensions: None,
- metadata_size_hint: None,
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::ListingTableUrl;
- use datafusion_execution::object_store::{
- DefaultObjectStoreRegistry, ObjectStoreRegistry,
- };
- use object_store::{local::LocalFileSystem, path::Path};
- use std::{ops::Not, sync::Arc};
- use url::Url;
-
- #[test]
- fn test_object_store_listing_url() {
- let listing = ListingTableUrl::parse("file:///").unwrap();
- let store = listing.object_store();
- assert_eq!(store.as_str(), "file:///");
-
- let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
- let store = listing.object_store();
- assert_eq!(store.as_str(), "s3://bucket/");
- }
-
- #[test]
- fn test_get_store_hdfs() {
- let sut = DefaultObjectStoreRegistry::default();
- let url = Url::parse("hdfs://localhost:8020").unwrap();
- sut.register_store(&url, Arc::new(LocalFileSystem::new()));
- let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
- sut.get_store(url.as_ref()).unwrap();
- }
-
- #[test]
- fn test_get_store_s3() {
- let sut = DefaultObjectStoreRegistry::default();
- let url = Url::parse("s3://bucket/key").unwrap();
- sut.register_store(&url, Arc::new(LocalFileSystem::new()));
- let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
- sut.get_store(url.as_ref()).unwrap();
- }
-
- #[test]
- fn test_get_store_file() {
- let sut = DefaultObjectStoreRegistry::default();
- let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
- sut.get_store(url.as_ref()).unwrap();
- }
-
- #[test]
- fn test_get_store_local() {
- let sut = DefaultObjectStoreRegistry::default();
- let url = ListingTableUrl::parse("../").unwrap();
- sut.get_store(url.as_ref()).unwrap();
- }
-
- #[test]
- fn test_url_contains() {
- let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
-
- // standard case with default config
- assert!(url.contains(
- &Path::parse("/var/data/mytable/data.parquet").unwrap(),
- true
- ));
-
- // standard case with `ignore_subdirectory` set to false
- assert!(url.contains(
- &Path::parse("/var/data/mytable/data.parquet").unwrap(),
- false
- ));
-
- // as per documentation, when `ignore_subdirectory` is true, we should
ignore files that aren't
- // a direct child of the `url`
- assert!(url
- .contains(
-
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
- true
- )
- .not());
-
- // when we set `ignore_subdirectory` to false, we should not ignore
the file
- assert!(url.contains(
-
&Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
- false
- ));
-
- // as above, `ignore_subdirectory` is false, so we include the file
- assert!(url.contains(
- &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
- false
- ));
-
- // in this case, we include the file even when `ignore_subdirectory`
is true because the
- // path segment is a hive partition which doesn't count as a
subdirectory for the purposes
- // of `Url::contains`
- assert!(url.contains(
- &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
- true
- ));
-
- // testing an empty path with default config
- assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(),
true));
-
- // testing an empty path with `ignore_subdirectory` set to false
- assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(),
false));
- }
-}
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 780b229833..ca0aa92ff1 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -809,8 +809,9 @@ pub mod variable {
pub use datafusion_expr::var_provider::{VarProvider, VarType};
}
-#[cfg(test)]
+#[cfg(not(target_arch = "wasm32"))]
pub mod test;
+
pub mod test_util;
#[cfg(doctest)]
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index 0d659582ac..05e63a3c4f 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -61,7 +61,7 @@ pub fn create_table_dual() -> Arc<dyn TableProvider> {
Field::new("name", DataType::Utf8, false),
]));
let batch = RecordBatch::try_new(
- dual_schema.clone(),
+ Arc::<Schema>::clone(&dual_schema),
vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(array::StringArray::from(vec!["a"])),
@@ -244,7 +244,7 @@ pub fn table_with_sequence(
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32,
true)]));
let arr =
Arc::new(Int32Array::from((seq_start..=seq_end).collect::<Vec<_>>()));
let partitions = vec![vec![RecordBatch::try_new(
- schema.clone(),
+ Arc::<Schema>::clone(&schema),
vec![arr as ArrayRef],
)?]];
Ok(Arc::new(MemTable::try_new(schema, partitions)?))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]