This is an automated email from the ASF dual-hosted git repository.
jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 78ee13a7cf Consolidate csv_opener.rs and json_opener.rs into a single
example (#… (#13981)
78ee13a7cf is described below
commit 78ee13a7cfdb741d91be805e4117281a2054d2d3
Author: Sergey Zhukov <[email protected]>
AuthorDate: Sun Jan 5 10:56:23 2025 +0300
Consolidate csv_opener.rs and json_opener.rs into a single example (#…
(#13981)
* Consolidate csv_opener.rs and json_opener.rs into a single example
(#13955)
* Update datafusion-examples/examples/csv_json_opener.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion-examples/README.md
Co-authored-by: Andrew Lamb <[email protected]>
* Apply code formatting with cargo fmt
---------
Co-authored-by: Sergey Zhukov <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion-examples/README.md | 1 +
.../examples/{csv_opener.rs => csv_json_opener.rs} | 94 +++++++++++++++++-----
datafusion-examples/examples/json_opener.rs | 88 --------------------
3 files changed, 77 insertions(+), 106 deletions(-)
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index a3f100cbcb..b5f82b4d51 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -54,6 +54,7 @@ cargo run --example dataframe
- [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
- [`composed_extension_codec`](examples/composed_extension_codec.rs): Example
of using multiple extension codecs for serialization / deserialization
- [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a
streaming query plan from a SQL statement against a local CSV file
+- [`csv_json_opener.rs`](examples/csv_json_opener.rs): Use low level
`FileOpener` APIs to read CSV/JSON into Arrow `RecordBatch`es
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against
a custom datasource (TableProvider)
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a
custom file format
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run
a query using a DataFrame against a parquet file from s3 and writing back to s3
diff --git a/datafusion-examples/examples/csv_opener.rs
b/datafusion-examples/examples/csv_json_opener.rs
similarity index 50%
rename from datafusion-examples/examples/csv_opener.rs
rename to datafusion-examples/examples/csv_json_opener.rs
index e7b7ead109..334e4c8340 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_json_opener.rs
@@ -15,28 +15,36 @@
// specific language governing permissions and limitations
// under the License.
-use std::{sync::Arc, vec};
+use std::sync::Arc;
+use arrow_schema::{DataType, Field, Schema};
use datafusion::{
assert_batches_eq,
datasource::{
file_format::file_compression_type::FileCompressionType,
listing::PartitionedFile,
object_store::ObjectStoreUrl,
- physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream},
+ physical_plan::{CsvConfig, CsvOpener, FileScanConfig, FileStream,
JsonOpener},
},
error::Result,
physical_plan::metrics::ExecutionPlanMetricsSet,
test_util::aggr_test_schema,
};
-
use futures::StreamExt;
-use object_store::local::LocalFileSystem;
+use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};
-/// This example demonstrates a scanning against an Arrow data source (CSV) and
-/// fetching results
+/// This example demonstrates using the low level [`FileStream`] /
[`FileOpener`] APIs to directly
+/// read data from (CSV/JSON) into Arrow RecordBatches.
+///
+/// If you want to query data in CSV or JSON files, see the [`dataframe.rs`]
and [`sql_query.rs`] examples
#[tokio::main]
async fn main() -> Result<()> {
+ csv_opener().await?;
+ json_opener().await?;
+ Ok(())
+}
+
+async fn csv_opener() -> Result<()> {
let object_store = Arc::new(LocalFileSystem::new());
let schema = aggr_test_schema();
@@ -59,18 +67,17 @@ async fn main() -> Result<()> {
let path = std::path::Path::new(&path).canonicalize()?;
- let scan_config =
- FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
- .with_projection(Some(vec![12, 0]))
- .with_limit(Some(5))
- .with_file(PartitionedFile::new(path.display().to_string(), 10));
-
- let result =
- FileStream::new(&scan_config, 0, opener,
&ExecutionPlanMetricsSet::new())
- .unwrap()
- .map(|b| b.unwrap())
- .collect::<Vec<_>>()
- .await;
+ let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
schema)
+ .with_projection(Some(vec![12, 0]))
+ .with_limit(Some(5))
+ .with_file(PartitionedFile::new(path.display().to_string(), 10));
+
+ let mut result = vec![];
+ let mut stream =
+ FileStream::new(&scan_config, 0, opener,
&ExecutionPlanMetricsSet::new())?;
+ while let Some(batch) = stream.next().await.transpose()? {
+ result.push(batch);
+ }
assert_batches_eq!(
&[
"+--------------------------------+----+",
@@ -87,3 +94,54 @@ async fn main() -> Result<()> {
);
Ok(())
}
+
+async fn json_opener() -> Result<()> {
+ let object_store = InMemory::new();
+ let path = object_store::path::Path::from("demo.json");
+ let data = bytes::Bytes::from(
+ r#"{"num":5,"str":"test"}
+ {"num":2,"str":"hello"}
+ {"num":4,"str":"foo"}"#,
+ );
+
+ object_store.put(&path, data.into()).await?;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("num", DataType::Int64, false),
+ Field::new("str", DataType::Utf8, false),
+ ]));
+
+ let projected = Arc::new(schema.clone().project(&[1, 0])?);
+
+ let opener = JsonOpener::new(
+ 8192,
+ projected,
+ FileCompressionType::UNCOMPRESSED,
+ Arc::new(object_store),
+ );
+
+ let scan_config = FileScanConfig::new(ObjectStoreUrl::local_filesystem(),
schema)
+ .with_projection(Some(vec![1, 0]))
+ .with_limit(Some(5))
+ .with_file(PartitionedFile::new(path.to_string(), 10));
+
+ let mut stream =
+ FileStream::new(&scan_config, 0, opener,
&ExecutionPlanMetricsSet::new())?;
+ let mut result = vec![];
+ while let Some(batch) = stream.next().await.transpose()? {
+ result.push(batch);
+ }
+ assert_batches_eq!(
+ &[
+ "+-------+-----+",
+ "| str | num |",
+ "+-------+-----+",
+ "| test | 5 |",
+ "| hello | 2 |",
+ "| foo | 4 |",
+ "+-------+-----+",
+ ],
+ &result
+ );
+ Ok(())
+}
diff --git a/datafusion-examples/examples/json_opener.rs
b/datafusion-examples/examples/json_opener.rs
deleted file mode 100644
index 7bc431c5c5..0000000000
--- a/datafusion-examples/examples/json_opener.rs
+++ /dev/null
@@ -1,88 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{sync::Arc, vec};
-
-use arrow_schema::{DataType, Field, Schema};
-use datafusion::{
- assert_batches_eq,
- datasource::{
- file_format::file_compression_type::FileCompressionType,
- listing::PartitionedFile,
- object_store::ObjectStoreUrl,
- physical_plan::{FileScanConfig, FileStream, JsonOpener},
- },
- error::Result,
- physical_plan::metrics::ExecutionPlanMetricsSet,
-};
-
-use futures::StreamExt;
-use object_store::ObjectStore;
-
-/// This example demonstrates a scanning against an Arrow data source (JSON)
and
-/// fetching results
-#[tokio::main]
-async fn main() -> Result<()> {
- let object_store = object_store::memory::InMemory::new();
- let path = object_store::path::Path::from("demo.json");
- let data = bytes::Bytes::from(
- r#"{"num":5,"str":"test"}
- {"num":2,"str":"hello"}
- {"num":4,"str":"foo"}"#,
- );
- object_store.put(&path, data.into()).await.unwrap();
-
- let schema = Arc::new(Schema::new(vec![
- Field::new("num", DataType::Int64, false),
- Field::new("str", DataType::Utf8, false),
- ]));
-
- let projected = Arc::new(schema.clone().project(&[1, 0])?);
-
- let opener = JsonOpener::new(
- 8192,
- projected,
- FileCompressionType::UNCOMPRESSED,
- Arc::new(object_store),
- );
-
- let scan_config =
- FileScanConfig::new(ObjectStoreUrl::local_filesystem(), schema.clone())
- .with_projection(Some(vec![1, 0]))
- .with_limit(Some(5))
- .with_file(PartitionedFile::new(path.to_string(), 10));
-
- let result =
- FileStream::new(&scan_config, 0, opener,
&ExecutionPlanMetricsSet::new())
- .unwrap()
- .map(|b| b.unwrap())
- .collect::<Vec<_>>()
- .await;
- assert_batches_eq!(
- &[
- "+-------+-----+",
- "| str | num |",
- "+-------+-----+",
- "| test | 5 |",
- "| hello | 2 |",
- "| foo | 4 |",
- "+-------+-----+",
- ],
- &result
- );
- Ok(())
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]