This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b131cac1b0 Consolidate custom data source examples (#18142) (#18553)
b131cac1b0 is described below

commit b131cac1b0cf6ace6e250a1b32a6f210269dec13
Author: Sergey Zhukov <[email protected]>
AuthorDate: Sun Nov 9 19:48:35 2025 +0300

    Consolidate custom data source examples (#18142) (#18553)
    
    ## Which issue does this PR close?
    This PR is for consolidating all the `custom_data_source` examples into
    a single example binary. We are agreed on the pattern and we can apply
    it to the remaining examples
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - part of #https://github.com/apache/datafusion/issues/18142.
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
    
    ---------
    
    Co-authored-by: Sergey Zhukov <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion-examples/Cargo.toml                     |   4 -
 datafusion-examples/README.md                      |  12 +-
 .../examples/builtin_functions/main.rs             |   5 +
 .../{ => custom_data_source}/csv_json_opener.rs    |   3 +-
 .../{ => custom_data_source}/csv_sql_streaming.rs  |   3 +-
 .../{ => custom_data_source}/custom_datasource.rs  |   3 +-
 .../{ => custom_data_source}/custom_file_casts.rs  |   4 +-
 .../{ => custom_data_source}/custom_file_format.rs |  74 ++++++------
 .../file_stream_provider.rs                        |  36 +++---
 .../examples/custom_data_source/main.rs            | 126 +++++++++++++++++++++
 datafusion-examples/examples/flight/main.rs        |   5 +
 datafusion-examples/examples/udf/main.rs           |   5 +
 12 files changed, 210 insertions(+), 70 deletions(-)

diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml
index 38f1f8b0e0..61711f8472 100644
--- a/datafusion-examples/Cargo.toml
+++ b/datafusion-examples/Cargo.toml
@@ -43,10 +43,6 @@ path = "examples/external_dependency/dataframe-to-s3.rs"
 name = "query_aws_s3"
 path = "examples/external_dependency/query-aws-s3.rs"
 
-[[example]]
-name = "custom_file_casts"
-path = "examples/custom_file_casts.rs"
-
 [dev-dependencies]
 arrow = { workspace = true }
 # arrow_schema is required for record_batch! macro :sad:
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 1befba6be6..62e51a7900 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -54,18 +54,18 @@ cargo run --example dataframe
 - [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule 
to change a query's semantics (row level access control)
 - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
 - [`composed_extension_codec`](examples/composed_extension_codec.rs): Example 
of using multiple extension codecs for serialization / deserialization
-- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a 
streaming query plan from a SQL statement against a local CSV file
-- [`csv_json_opener.rs`](examples/csv_json_opener.rs): Use low level 
`FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es
-- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against 
a custom datasource (TableProvider)
-- [`custom_file_casts.rs`](examples/custom_file_casts.rs): Implement custom 
casting rules to adapt file schemas
-- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a 
custom file format
+- 
[`examples/custom_data_source/csv_sql_streaming.rs`](examples/custom_data_source/csv_sql_streaming.rs):
 Build and run a streaming query plan from a SQL statement against a local CSV 
file
+- 
[`examples/custom_data_source/csv_json_opener.rs`](examples/custom_data_source/csv_json_opener.rs):
 Use low level `FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es
+- 
[`examples/custom_data_source/custom_datasource.rs`](examples/custom_data_source/custom_datasource.rs):
 Run queries against a custom datasource (TableProvider)
+- 
[`examples/custom_data_source/custom_file_casts.rs`](examples/custom_data_source/custom_file_casts.rs):
 Implement custom casting rules to adapt file schemas
+- 
[`examples/custom_data_source/custom_file_format.rs`](examples/custom_data_source/custom_file_format.rs):
 Write data to a custom file format
 - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run 
a query using a DataFrame against a parquet file from s3 and writing back to s3
 - [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API 
against parquet files, csv files, and in-memory data, including multiple 
subqueries. Also demonstrates the various methods to write out a DataFrame to a 
table, parquet file, csv file, and json file.
 - 
[`examples/builtin_functions/date_time`](examples/builtin_functions/date_time.rs):
 Examples of date-time related functions and queries
 - [`default_column_values.rs`](examples/default_column_values.rs): Implement 
custom default value handling for missing columns using field metadata and 
PhysicalExprAdapter
 - [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert 
query results (Arrow ArrayRefs) into Rust structs
 - [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze 
and coerce `Expr`s
-- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query 
on `FileStreamProvider` which implements `StreamProvider` for reading and 
writing to arbitrary stream sources / sinks.
+- 
[`examples/custom_data_source/file_stream_provider.rs`](examples/custom_data_source/file_stream_provider.rs):
 Run a query on `FileStreamProvider` which implements `StreamProvider` for 
reading and writing to arbitrary stream sources / sinks.
 - [`flight/sql_server.rs`](examples/flight/sql_server.rs): Run DataFusion as a 
standalone process and execute SQL queries from Flight and and FlightSQL (e.g. 
JDBC) clients
 - 
[`examples/builtin_functions/function_factory.rs`](examples/builtin_functions/function_factory.rs):
 Register `CREATE FUNCTION` handler to implement SQL macros
 - [`memory_pool_tracking.rs`](examples/memory_pool_tracking.rs): Demonstrates 
TrackConsumersPool for memory tracking and debugging with enhanced error 
messages
diff --git a/datafusion-examples/examples/builtin_functions/main.rs 
b/datafusion-examples/examples/builtin_functions/main.rs
index 3399c395bf..c307bc9532 100644
--- a/datafusion-examples/examples/builtin_functions/main.rs
+++ b/datafusion-examples/examples/builtin_functions/main.rs
@@ -19,6 +19,11 @@
 //!
 //! These examples demonstrate miscellaneous function-related features.
 //!
+//! ## Usage
+//! ```bash
+//! cargo run --example builtin_functions -- 
[date_time|function_factory|regexp]
+//! ```
+//!
 //! Each subcommand runs a corresponding example:
 //! - `date_time` — examples of date-time related functions and queries
 //! - `function_factory` — register `CREATE FUNCTION` handler to implement SQL 
macros
diff --git a/datafusion-examples/examples/csv_json_opener.rs 
b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs
similarity index 99%
rename from datafusion-examples/examples/csv_json_opener.rs
rename to datafusion-examples/examples/custom_data_source/csv_json_opener.rs
index 6d0e4f4a3d..4205bbcdf8 100644
--- a/datafusion-examples/examples/csv_json_opener.rs
+++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs
@@ -40,8 +40,7 @@ use object_store::{local::LocalFileSystem, memory::InMemory, 
ObjectStore};
 /// read data from (CSV/JSON) into Arrow RecordBatches.
 ///
 /// If you want to query data in CSV or JSON files, see the [`dataframe.rs`] 
and [`sql_query.rs`] examples
-#[tokio::main]
-async fn main() -> Result<()> {
+pub async fn csv_json_opener() -> Result<()> {
     csv_opener().await?;
     json_opener().await?;
     Ok(())
diff --git a/datafusion-examples/examples/csv_sql_streaming.rs 
b/datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs
similarity index 98%
rename from datafusion-examples/examples/csv_sql_streaming.rs
rename to datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs
index 99264bbcb4..aca63c4f35 100644
--- a/datafusion-examples/examples/csv_sql_streaming.rs
+++ b/datafusion-examples/examples/custom_data_source/csv_sql_streaming.rs
@@ -21,8 +21,7 @@ use datafusion::prelude::*;
 
 /// This example demonstrates executing a simple query against an Arrow data 
source (CSV) and
 /// fetching results with streaming aggregation and streaming window
-#[tokio::main]
-async fn main() -> Result<()> {
+pub async fn csv_sql_streaming() -> Result<()> {
     // create local execution context
     let ctx = SessionContext::new();
 
diff --git a/datafusion-examples/examples/custom_datasource.rs 
b/datafusion-examples/examples/custom_data_source/custom_datasource.rs
similarity index 99%
rename from datafusion-examples/examples/custom_datasource.rs
rename to datafusion-examples/examples/custom_data_source/custom_datasource.rs
index bc865fac5a..2213d50fcc 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs
@@ -42,8 +42,7 @@ use datafusion::catalog::Session;
 use tokio::time::timeout;
 
 /// This example demonstrates executing a simple query against a custom 
datasource
-#[tokio::main]
-async fn main() -> Result<()> {
+pub async fn custom_datasource() -> Result<()> {
     // create our custom datasource and adding some users
     let db = CustomDataSource::default();
     db.populate_users();
diff --git a/datafusion-examples/examples/custom_file_casts.rs 
b/datafusion-examples/examples/custom_data_source/custom_file_casts.rs
similarity index 99%
rename from datafusion-examples/examples/custom_file_casts.rs
rename to datafusion-examples/examples/custom_data_source/custom_file_casts.rs
index 4d97ecd91d..31ec2845c6 100644
--- a/datafusion-examples/examples/custom_file_casts.rs
+++ b/datafusion-examples/examples/custom_data_source/custom_file_casts.rs
@@ -44,9 +44,7 @@ use object_store::{ObjectStore, PutPayload};
 // This example enforces that casts must be strictly widening: if the file 
type is Int64 and the table type is Int32, it will error
 // before even reading the data.
 // Without this custom cast rule DataFusion would happily do the narrowing 
cast, potentially erroring only if it found a row with data it could not cast.
-
-#[tokio::main]
-async fn main() -> Result<()> {
+pub async fn custom_file_casts() -> Result<()> {
     println!("=== Creating example data ===");
 
     // Create a logical / table schema with an Int32 column
diff --git a/datafusion-examples/examples/custom_file_format.rs 
b/datafusion-examples/examples/custom_data_source/custom_file_format.rs
similarity index 97%
rename from datafusion-examples/examples/custom_file_format.rs
rename to datafusion-examples/examples/custom_data_source/custom_file_format.rs
index 3505651eb1..510fa53c59 100644
--- a/datafusion-examples/examples/custom_file_format.rs
+++ b/datafusion-examples/examples/custom_data_source/custom_file_format.rs
@@ -48,6 +48,42 @@ use tempfile::tempdir;
 /// TSVFileFormatFactory is responsible for creating instances of 
TSVFileFormat.
 /// The former, once registered with the SessionState, will then be used
 /// to facilitate SQL operations on TSV files, such as `COPY TO` shown here.
+pub async fn custom_file_format() -> Result<()> {
+    // Create a new context with the default configuration
+    let mut state = SessionStateBuilder::new().with_default_features().build();
+
+    // Register the custom file format
+    let file_format = Arc::new(TSVFileFactory::new());
+    state.register_file_format(file_format, true)?;
+
+    // Create a new context with the custom file format
+    let ctx = SessionContext::new_with_state(state);
+
+    let mem_table = create_mem_table();
+    ctx.register_table("mem_table", mem_table)?;
+
+    let temp_dir = tempdir().unwrap();
+    let table_save_path = temp_dir.path().join("mem_table.tsv");
+
+    let d = ctx
+        .sql(&format!(
+            "COPY mem_table TO '{}' STORED AS TSV;",
+            table_save_path.display(),
+        ))
+        .await?;
+
+    let results = d.collect().await?;
+    println!(
+        "Number of inserted rows: {:?}",
+        (results[0]
+            .column_by_name("count")
+            .unwrap()
+            .as_primitive::<UInt64Type>()
+            .value(0))
+    );
+
+    Ok(())
+}
 
 #[derive(Debug)]
 /// Custom file format that reads and writes TSV files
@@ -181,44 +217,6 @@ impl GetExt for TSVFileFactory {
     }
 }
 
-#[tokio::main]
-async fn main() -> Result<()> {
-    // Create a new context with the default configuration
-    let mut state = SessionStateBuilder::new().with_default_features().build();
-
-    // Register the custom file format
-    let file_format = Arc::new(TSVFileFactory::new());
-    state.register_file_format(file_format, true).unwrap();
-
-    // Create a new context with the custom file format
-    let ctx = SessionContext::new_with_state(state);
-
-    let mem_table = create_mem_table();
-    ctx.register_table("mem_table", mem_table).unwrap();
-
-    let temp_dir = tempdir().unwrap();
-    let table_save_path = temp_dir.path().join("mem_table.tsv");
-
-    let d = ctx
-        .sql(&format!(
-            "COPY mem_table TO '{}' STORED AS TSV;",
-            table_save_path.display(),
-        ))
-        .await?;
-
-    let results = d.collect().await?;
-    println!(
-        "Number of inserted rows: {:?}",
-        (results[0]
-            .column_by_name("count")
-            .unwrap()
-            .as_primitive::<UInt64Type>()
-            .value(0))
-    );
-
-    Ok(())
-}
-
 // create a simple mem table
 fn create_mem_table() -> Arc<MemTable> {
     let fields = vec![
diff --git a/datafusion-examples/examples/file_stream_provider.rs 
b/datafusion-examples/examples/custom_data_source/file_stream_provider.rs
similarity index 91%
rename from datafusion-examples/examples/file_stream_provider.rs
rename to 
datafusion-examples/examples/custom_data_source/file_stream_provider.rs
index e6c59d57e9..55d2cc8cc0 100644
--- a/datafusion-examples/examples/file_stream_provider.rs
+++ b/datafusion-examples/examples/custom_data_source/file_stream_provider.rs
@@ -15,6 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
+/// Demonstrates how to use [`FileStreamProvider`] and [`StreamTable`] to 
stream data
+/// from a file-like source (FIFO) into DataFusion for continuous querying.
+///
+/// On non-Windows systems, this example creates a named pipe (FIFO) and
+/// writes rows into it asynchronously while DataFusion reads the data
+/// through a `FileStreamProvider`.  
+///
+/// This illustrates how to integrate dynamically updated data sources
+/// with DataFusion without needing to reload the entire dataset each time.
+///
+/// This example does not work on Windows.
+pub async fn file_stream_provider() -> datafusion::error::Result<()> {
+    #[cfg(target_os = "windows")]
+    {
+        println!("file_stream_provider example does not work on windows");
+        Ok(())
+    }
+    #[cfg(not(target_os = "windows"))]
+    {
+        non_windows::main().await
+    }
+}
+
 #[cfg(not(target_os = "windows"))]
 mod non_windows {
     use datafusion::assert_batches_eq;
@@ -186,16 +209,3 @@ mod non_windows {
         Ok(())
     }
 }
-
-#[tokio::main]
-async fn main() -> datafusion::error::Result<()> {
-    #[cfg(target_os = "windows")]
-    {
-        println!("file_stream_provider example does not work on windows");
-        Ok(())
-    }
-    #[cfg(not(target_os = "windows"))]
-    {
-        non_windows::main().await
-    }
-}
diff --git a/datafusion-examples/examples/custom_data_source/main.rs 
b/datafusion-examples/examples/custom_data_source/main.rs
new file mode 100644
index 0000000000..ce0585f8c3
--- /dev/null
+++ b/datafusion-examples/examples/custom_data_source/main.rs
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # These examples are all related to extending or defining how DataFusion 
reads data
+//!
+//! These examples demonstrate how DataFusion reads data.
+//!
+//! ## Usage
+//! ```bash
+//! cargo run --example custom_data_source -- 
[csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|file_stream_provider]
+//! ```
+//!
+//! Each subcommand runs a corresponding example:
+//! - `csv_json_opener` — use low level FileOpener APIs to read CSV/JSON into 
Arrow RecordBatches
+//! - `csv_sql_streaming` — build and run a streaming query plan from a SQL 
statement against a local CSV file
+//! - `custom_datasource` — run queries against a custom datasource 
(TableProvider)
+//! - `custom_file_casts` — implement custom casting rules to adapt file 
schemas
+//! - `custom_file_format` — write data to a custom file format
+//! - `file_stream_provider` — run a query on FileStreamProvider which 
implements StreamProvider for reading and writing to arbitrary stream 
sources/sinks
+
+mod csv_json_opener;
+mod csv_sql_streaming;
+mod custom_datasource;
+mod custom_file_casts;
+mod custom_file_format;
+mod file_stream_provider;
+
+use std::str::FromStr;
+
+use datafusion::error::{DataFusionError, Result};
+
+enum ExampleKind {
+    CsvJsonOpener,
+    CsvSqlStreaming,
+    CustomDatasource,
+    CustomFileCasts,
+    CustomFileFormat,
+    FileFtreamProvider,
+}
+
+impl AsRef<str> for ExampleKind {
+    fn as_ref(&self) -> &str {
+        match self {
+            Self::CsvJsonOpener => "csv_json_opener",
+            Self::CsvSqlStreaming => "csv_sql_streaming",
+            Self::CustomDatasource => "custom_datasource",
+            Self::CustomFileCasts => "custom_file_casts",
+            Self::CustomFileFormat => "custom_file_format",
+            Self::FileFtreamProvider => "file_stream_provider",
+        }
+    }
+}
+
+impl FromStr for ExampleKind {
+    type Err = DataFusionError;
+
+    fn from_str(s: &str) -> Result<Self> {
+        match s {
+            "csv_json_opener" => Ok(Self::CsvJsonOpener),
+            "csv_sql_streaming" => Ok(Self::CsvSqlStreaming),
+            "custom_datasource" => Ok(Self::CustomDatasource),
+            "custom_file_casts" => Ok(Self::CustomFileCasts),
+            "custom_file_format" => Ok(Self::CustomFileFormat),
+            "file_stream_provider" => Ok(Self::FileFtreamProvider),
+            _ => Err(DataFusionError::Execution(format!("Unknown example: 
{s}"))),
+        }
+    }
+}
+
+impl ExampleKind {
+    const ALL: [Self; 6] = [
+        Self::CsvJsonOpener,
+        Self::CsvSqlStreaming,
+        Self::CustomDatasource,
+        Self::CustomFileCasts,
+        Self::CustomFileFormat,
+        Self::FileFtreamProvider,
+    ];
+
+    const EXAMPLE_NAME: &str = "custom_data_source";
+
+    fn variants() -> Vec<&'static str> {
+        Self::ALL.iter().map(|x| x.as_ref()).collect()
+    }
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let usage = format!(
+        "Usage: cargo run --example {} -- [{}]",
+        ExampleKind::EXAMPLE_NAME,
+        ExampleKind::variants().join("|")
+    );
+
+    let arg = std::env::args().nth(1).ok_or_else(|| {
+        eprintln!("{usage}");
+        DataFusionError::Execution("Missing argument".to_string())
+    })?;
+
+    match arg.parse::<ExampleKind>()? {
+        ExampleKind::CsvJsonOpener => 
csv_json_opener::csv_json_opener().await?,
+        ExampleKind::CsvSqlStreaming => 
csv_sql_streaming::csv_sql_streaming().await?,
+        ExampleKind::CustomDatasource => 
custom_datasource::custom_datasource().await?,
+        ExampleKind::CustomFileCasts => 
custom_file_casts::custom_file_casts().await?,
+        ExampleKind::CustomFileFormat => 
custom_file_format::custom_file_format().await?,
+        ExampleKind::FileFtreamProvider => {
+            file_stream_provider::file_stream_provider().await?
+        }
+    }
+
+    Ok(())
+}
diff --git a/datafusion-examples/examples/flight/main.rs 
b/datafusion-examples/examples/flight/main.rs
index a448789b35..a83b19bac4 100644
--- a/datafusion-examples/examples/flight/main.rs
+++ b/datafusion-examples/examples/flight/main.rs
@@ -19,6 +19,11 @@
 //!
 //! These examples demonstrate Arrow Flight usage.
 //!
+//! ## Usage
+//! ```bash
+//! cargo run --example flight -- [client|server|sql_server]
+//! ```
+//!
 //! Each subcommand runs a corresponding example:
 //! - `client` — run DataFusion as a standalone process and execute SQL 
queries from a client using the Flight protocol
 //! - `server` — run DataFusion as a standalone process and execute SQL 
queries from a client using the Flight protocol
diff --git a/datafusion-examples/examples/udf/main.rs 
b/datafusion-examples/examples/udf/main.rs
index ba36dbb15c..104d373937 100644
--- a/datafusion-examples/examples/udf/main.rs
+++ b/datafusion-examples/examples/udf/main.rs
@@ -19,6 +19,11 @@
 //!
 //! These examples demonstrate user-defined functions in DataFusion.
 //!
+//! ## Usage
+//! ```bash
+//! cargo run --example udf -- 
[adv_udaf|adv_udf|adv_udwf|async_udf|udaf|udf|udtf|udwf]
+//! ```
+//!
 //! Each subcommand runs a corresponding example:
 //! - `adv_udaf` — user defined aggregate function example
 //! - `adv_udf` — user defined scalar function example


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to