This is an automated email from the ASF dual-hosted git repository.

jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 78ee13a7cf Consolidate csv_opener.rs and json_opener.rs into a single 
example (#… (#13981)
78ee13a7cf is described below

commit 78ee13a7cfdb741d91be805e4117281a2054d2d3
Author: Sergey Zhukov <[email protected]>
AuthorDate: Sun Jan 5 10:56:23 2025 +0300

    Consolidate csv_opener.rs and json_opener.rs into a single example (#… 
(#13981)
    
    * Consolidate csv_opener.rs and json_opener.rs into a single example 
(#13955)
    
    * Update datafusion-examples/examples/csv_json_opener.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion-examples/README.md
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Apply code formatting with cargo fmt
    
    ---------
    
    Co-authored-by: Sergey Zhukov <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion-examples/README.md                      |  1 +
 .../examples/{csv_opener.rs => csv_json_opener.rs} | 94 +++++++++++++++++-----
 datafusion-examples/examples/json_opener.rs        | 88 --------------------
 3 files changed, 77 insertions(+), 106 deletions(-)

diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index a3f100cbcb..b5f82b4d51 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -54,6 +54,7 @@ cargo run --example dataframe
 - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
 - [`composed_extension_codec`](examples/composed_extension_codec.rs): Example 
of using multiple extension codecs for serialization / deserialization
 - [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a 
streaming query plan from a SQL statement against a local CSV file
+- [`csv_json_opener.rs`](examples/csv_json_opener.rs): Use low level 
`FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es
 - [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against 
a custom datasource (TableProvider)
 - [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a 
custom file format
 - [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run 
a query using a DataFrame against a parquet file from s3 and writing back to s3
diff --git a/datafusion-examples/examples/csv_opener.rs 
b/datafusion-examples/examples/csv_json_opener.rs
similarity index 50%
rename from datafusion-examples/examples/csv_opener.rs
rename to datafusion-examples/examples/csv_json_opener.rs
index e7b7ead109..334e4c8340 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_json_opener.rs
@@ -15,28 +15,36 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{sync::Arc, vec};
+use std::sync::Arc;
 
+use arrow_schema::{DataType, Field, Schema};
 use datafusion::{
     assert_batches_eq,
     datasource::{
         file_format::file_compression_type::FileCompressionType,
         listing::PartitionedFile,
         object_store::ObjectStoreUrl,
-        physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
+        physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream, 
JsonOpener},
     },
     error::Result,
     physical_plan::metrics::ExecutionPlanMetricsSet,
     test_util::aggr_test_schema,
 };
-
 use futures::StreamExt;
-use object_store::local::LocalFileSystem;
+use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};
 
-/// This example demonstrates a scanning against an Arrow data source (CSV) and
-/// fetching results
+/// This example demonstrates using the low level [`FileStream`] / 
[`FileOpener`] APIs to directly
+/// read data from (CSV/JSON) into Arrow RecordBatches.
+///
+/// If you want to query data in CSV or JSON files, see the [`dataframe.rs`] 
and [`sql_query.rs`] examples
 #[tokio::main]
 async fn main() -> Result<()> {
+    csv_opener().await?;
+    json_opener().await?;
+    Ok(())
+}
+
+async fn csv_opener() -> Result<()> {
     let object_store = Arc::new(LocalFileSystem::new());
     let schema = aggr_test_schema();
 
@@ -59,18 +67,17 @@ async fn main() -> Result<()> {
 
     let path = std::path::Path::new(&path).canonicalize()?;
 
-    let scan_config =
-        FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
-            .with_projection(Some(vec![12, 0]))
-            .with_limit(Some(5))
-            .with_file(PartitionedFile::new(path.display().to_string(), 10));
-
-    let result =
-        FileStream::new(&scan_config, 0, opener, 
&ExecutionPlanMetricsSet::new())
-            .unwrap()
-            .map(|b| b.unwrap())
-            .collect::<Vec<_>>()
-            .await;
+    let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
schema)
+        .with_projection(Some(vec![12, 0]))
+        .with_limit(Some(5))
+        .with_file(PartitionedFile::new(path.display().to_string(), 10));
+
+    let mut result = vec![];
+    let mut stream =
+        FileStream::new(&scan_config, 0, opener, 
&ExecutionPlanMetricsSet::new())?;
+    while let Some(batch) = stream.next().await.transpose()? {
+        result.push(batch);
+    }
     assert_batches_eq!(
         &[
             "+--------------------------------+----+",
@@ -87,3 +94,54 @@ async fn main() -> Result<()> {
     );
     Ok(())
 }
+
+async fn json_opener() -> Result<()> {
+    let object_store = InMemory::new();
+    let path = object_store::path::Path::from("demo.json");
+    let data = bytes::Bytes::from(
+        r#"{"num":5,"str":"test"}
+        {"num":2,"str":"hello"}
+        {"num":4,"str":"foo"}"#,
+    );
+
+    object_store.put(&path, data.into()).await?;
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("num", DataType::Int64, false),
+        Field::new("str", DataType::Utf8, false),
+    ]));
+
+    let projected = Arc::new(schema.clone().project(&[1, 0])?);
+
+    let opener = JsonOpener::new(
+        8192,
+        projected,
+        FileCompressionType::UNCOMPRESSED,
+        Arc::new(object_store),
+    );
+
+    let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(), 
schema)
+        .with_projection(Some(vec![1, 0]))
+        .with_limit(Some(5))
+        .with_file(PartitionedFile::new(path.to_string(), 10));
+
+    let mut stream =
+        FileStream::new(&scan_config, 0, opener, 
&ExecutionPlanMetricsSet::new())?;
+    let mut result = vec![];
+    while let Some(batch) = stream.next().await.transpose()? {
+        result.push(batch);
+    }
+    assert_batches_eq!(
+        &[
+            "+-------+-----+",
+            "| str   | num |",
+            "+-------+-----+",
+            "| test  | 5   |",
+            "| hello | 2   |",
+            "| foo   | 4   |",
+            "+-------+-----+",
+        ],
+        &result
+    );
+    Ok(())
+}
diff --git a/datafusion-examples/examples/json_opener.rs 
b/datafusion-examples/examples/json_opener.rs
deleted file mode 100644
index 7bc431c5c5..0000000000
--- a/datafusion-examples/examples/json_opener.rs
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{sync::Arc, vec};
-
-use arrow_schema::{DataType, Field, Schema};
-use datafusion::{
-    assert_batches_eq,
-    datasource::{
-        file_format::file_compression_type::FileCompressionType,
-        listing::PartitionedFile,
-        object_store::ObjectStoreUrl,
-        physical_plan::{FileScanConfig, FileStream, JsonOpener},
-    },
-    error::Result,
-    physical_plan::metrics::ExecutionPlanMetricsSet,
-};
-
-use futures::StreamExt;
-use object_store::ObjectStore;
-
-/// This example demonstrates a scanning against an Arrow data source (JSON) 
and
-/// fetching results
-#[tokio::main]
-async fn main() -> Result<()> {
-    let object_store = object_store::memory::InMemory::new();
-    let path = object_store::path::Path::from("demo.json");
-    let data = bytes::Bytes::from(
-        r#"{"num":5,"str":"test"}
-        {"num":2,"str":"hello"}
-        {"num":4,"str":"foo"}"#,
-    );
-    object_store.put(&path, data.into()).await.unwrap();
-
-    let schema = Arc::new(Schema::new(vec![
-        Field::new("num", DataType::Int64, false),
-        Field::new("str", DataType::Utf8, false),
-    ]));
-
-    let projected = Arc::new(schema.clone().project(&[1, 0])?);
-
-    let opener = JsonOpener::new(
-        8192,
-        projected,
-        FileCompressionType::UNCOMPRESSED,
-        Arc::new(object_store),
-    );
-
-    let scan_config =
-        FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
-            .with_projection(Some(vec![1, 0]))
-            .with_limit(Some(5))
-            .with_file(PartitionedFile::new(path.to_string(), 10));
-
-    let result =
-        FileStream::new(&scan_config, 0, opener, 
&ExecutionPlanMetricsSet::new())
-            .unwrap()
-            .map(|b| b.unwrap())
-            .collect::<Vec<_>>()
-            .await;
-    assert_batches_eq!(
-        &[
-            "+-------+-----+",
-            "| str   | num |",
-            "+-------+-----+",
-            "| test  | 5   |",
-            "| hello | 2   |",
-            "| foo   | 4   |",
-            "+-------+-----+",
-        ],
-        &result
-    );
-    Ok(())
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to