etseidl commented on code in PR #6948:
URL: https://github.com/apache/arrow-rs/pull/6948#discussion_r1907717600
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async`
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
///
-/// In particular, this handles reading the parquet file metadata, allowing
consumers
+/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+ /// specified source.
///
/// # Example
+ /// ```
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() {
+ /// #
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // use tokio::fs::File to read data using an async I/O. This can be
replaced with
Review Comment:
```suggestion
/// // Use tokio::fs::File to read data using an async I/O. This can be
replaced with
```
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -15,65 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-//! Provides `async` API for reading parquet files as
+//! [`ParquetRecordBatchStreamBuilder`]: `async` API for reading Parquet
files as
//! [`RecordBatch`]es
//!
-//! ```
-//! # #[tokio::main(flavor="current_thread")]
-//! # async fn main() {
-//! #
-//! # use arrow_array::RecordBatch;
-//! # use arrow::util::pretty::pretty_format_batches;
-//! # use futures::TryStreamExt;
-//! # use tokio::fs::File;
-//! #
-//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
-//! #
-//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
-//! # let formatted = pretty_format_batches(batches).unwrap().to_string();
-//! # let actual_lines: Vec<_> = formatted.trim().lines().collect();
-//! # assert_eq!(
-//! # &actual_lines, expected_lines,
-//! # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
-//! # expected_lines, actual_lines
-//! # );
-//! # }
-//! #
-//! let testdata = arrow::util::test_util::parquet_test_data();
-//! let path = format!("{}/alltypes_plain.parquet", testdata);
-//! let file = File::open(path).await.unwrap();
+//! This can be used to decode a Parquet file in streaming fashion (without
+//! downloading the whole file at once) from a remote source, such as an
object store.
//!
-//! let builder = ParquetRecordBatchStreamBuilder::new(file)
-//! .await
-//! .unwrap()
-//! .with_batch_size(3);
-//!
-//! let file_metadata = builder.metadata().file_metadata();
-//! let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]);
-//!
-//! let stream = builder.with_projection(mask).build().unwrap();
-//! let results = stream.try_collect::<Vec<_>>().await.unwrap();
-//! assert_eq!(results.len(), 3);
-//!
-//! assert_batches_eq(
-//! &results,
-//! &[
-//! "+----------+-------------+-----------+",
-//! "| bool_col | tinyint_col | float_col |",
-//! "+----------+-------------+-----------+",
-//! "| true | 0 | 0.0 |",
-//! "| false | 1 | 1.1 |",
-//! "| true | 0 | 0.0 |",
-//! "| false | 1 | 1.1 |",
-//! "| true | 0 | 0.0 |",
-//! "| false | 1 | 1.1 |",
-//! "| true | 0 | 0.0 |",
-//! "| false | 1 | 1.1 |",
-//! "+----------+-------------+-----------+",
-//! ],
-//! );
-//! # }
-//! ```
+//! See example on [`ParquetRecordBatchStreamBuilder`]
Review Comment:
```suggestion
//! See example on [`ParquetRecordBatchStreamBuilder::new`]
```
Should this match the other references? It saves an extra mouse click.
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async`
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
///
-/// In particular, this handles reading the parquet file metadata, allowing
consumers
+/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+ /// specified source.
///
/// # Example
+ /// ```
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() {
+ /// #
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // use tokio::fs::File to read data using an async I/O. This can be
replaced with
+ /// // other async I/O reader such as a reader from an object store.
Review Comment:
```suggestion
/// // another async I/O reader such as a reader from an object store.
```
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async`
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
///
-/// In particular, this handles reading the parquet file metadata, allowing
consumers
+/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+ /// specified source.
///
/// # Example
+ /// ```
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() {
+ /// #
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // use tokio::fs::File to read data using an async I/O. This can be
replaced with
+ /// // other async I/O reader such as a reader from an object store.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async souce
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap();
+ /// // Building the stream opens the parquet file (reads metadata, etc)
and returns
+ /// // a stream that can be used to incrementally read the data in batches
+ /// let stream = builder.build().unwrap();
+ /// // in this example, we collect the stream into a Vec<RecordBatch>
+ /// // but real applications would likely process the batches as they are
read
+ /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+ /// // demonstrate the results are as expected
Review Comment:
```suggestion
/// // Demonstrate the results are as expected
```
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async`
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
///
-/// In particular, this handles reading the parquet file metadata, allowing
consumers
+/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+ /// specified source.
///
/// # Example
+ /// ```
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() {
+ /// #
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // use tokio::fs::File to read data using an async I/O. This can be
replaced with
+ /// // other async I/O reader such as a reader from an object store.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async souce
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap();
+ /// // Building the stream opens the parquet file (reads metadata, etc)
and returns
+ /// // a stream that can be used to incrementally read the data in batches
+ /// let stream = builder.build().unwrap();
+ /// // in this example, we collect the stream into a Vec<RecordBatch>
Review Comment:
```suggestion
/// // In this example, we collect the stream into a Vec<RecordBatch>
```
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async`
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
///
-/// In particular, this handles reading the parquet file metadata, allowing
consumers
+/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+ /// specified source.
///
/// # Example
+ /// ```
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() {
+ /// #
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // use tokio::fs::File to read data using an async I/O. This can be
replaced with
+ /// // other async I/O reader such as a reader from an object store.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async souce
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap();
+ /// // Building the stream opens the parquet file (reads metadata, etc)
and returns
+ /// // a stream that can be used to incrementally read the data in batches
+ /// let stream = builder.build().unwrap();
+ /// // in this example, we collect the stream into a Vec<RecordBatch>
+ /// // but real applications would likely process the batches as they are
read
+ /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+ /// // demonstrate the results are as expected
+ /// assert_batches_eq(
+ /// &results,
+ /// &[
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| id | bool_col | tinyint_col | smallint_col | int_col |
bigint_col | float_col | double_col | date_string_col | string_col |
timestamp_col |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| 4 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30332f30312f3039 | 30 |
2009-03-01T00:00:00 |",
+ /// "| 5 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30332f30312f3039 | 31 |
2009-03-01T00:01:00 |",
+ /// "| 6 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30342f30312f3039 | 30 |
2009-04-01T00:00:00 |",
+ /// "| 7 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30342f30312f3039 | 31 |
2009-04-01T00:01:00 |",
+ /// "| 2 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30322f30312f3039 | 30 |
2009-02-01T00:00:00 |",
+ /// "| 3 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30322f30312f3039 | 31 |
2009-02-01T00:01:00 |",
+ /// "| 0 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30312f30312f3039 | 30 |
2009-01-01T00:00:00 |",
+ /// "| 1 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30312f30312f3039 | 31 |
2009-01-01T00:01:00 |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// ],
+ /// );
+ /// # }
+ /// ```
+ ///
+ /// # Example configuring options and reading metadata
+ ///
+ /// There are many options that control the behavior of the reader, such as
+ /// `with_batch_size`, `with_projection`, `with_filter`, etc...
///
/// ```
- /// # use std::fs::metadata;
- /// # use std::sync::Arc;
- /// # use bytes::Bytes;
- /// # use arrow_array::{Int32Array, RecordBatch};
- /// # use arrow_schema::{DataType, Field, Schema};
- /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
- /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
- /// # use tempfile::tempfile;
- /// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
- /// # let mut file = tempfile().unwrap();
- /// # let schema = Arc::new(Schema::new(vec![Field::new("i32",
DataType::Int32, false)]));
- /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(),
None).unwrap();
- /// # let batch = RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
- /// # writer.write(&batch).unwrap();
- /// # writer.close().unwrap();
- /// // Open async file containing parquet data
- /// let mut file = tokio::fs::File::from_std(file);
- /// // construct the reader
- /// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
- /// .await.unwrap().build().unwrap();
- /// // Read batche
- /// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // as before, use tokio::fs::File to read data using an async I/O.
Review Comment:
```suggestion
/// // As before, use tokio::fs::File to read data using an async I/O.
```
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async`
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
///
-/// In particular, this handles reading the parquet file metadata, allowing
consumers
+/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+ /// specified source.
///
/// # Example
+ /// ```
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() {
+ /// #
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // use tokio::fs::File to read data using an async I/O. This can be
replaced with
+ /// // other async I/O reader such as a reader from an object store.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async souce
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap();
+ /// // Building the stream opens the parquet file (reads metadata, etc)
and returns
+ /// // a stream that can be used to incrementally read the data in batches
+ /// let stream = builder.build().unwrap();
+ /// // in this example, we collect the stream into a Vec<RecordBatch>
+ /// // but real applications would likely process the batches as they are
read
+ /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+ /// // demonstrate the results are as expected
+ /// assert_batches_eq(
+ /// &results,
+ /// &[
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| id | bool_col | tinyint_col | smallint_col | int_col |
bigint_col | float_col | double_col | date_string_col | string_col |
timestamp_col |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| 4 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30332f30312f3039 | 30 |
2009-03-01T00:00:00 |",
+ /// "| 5 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30332f30312f3039 | 31 |
2009-03-01T00:01:00 |",
+ /// "| 6 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30342f30312f3039 | 30 |
2009-04-01T00:00:00 |",
+ /// "| 7 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30342f30312f3039 | 31 |
2009-04-01T00:01:00 |",
+ /// "| 2 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30322f30312f3039 | 30 |
2009-02-01T00:00:00 |",
+ /// "| 3 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30322f30312f3039 | 31 |
2009-02-01T00:01:00 |",
+ /// "| 0 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30312f30312f3039 | 30 |
2009-01-01T00:00:00 |",
+ /// "| 1 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30312f30312f3039 | 31 |
2009-01-01T00:01:00 |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// ],
+ /// );
+ /// # }
+ /// ```
+ ///
+ /// # Example configuring options and reading metadata
+ ///
+ /// There are many options that control the behavior of the reader, such as
+ /// `with_batch_size`, `with_projection`, `with_filter`, etc...
///
/// ```
- /// # use std::fs::metadata;
- /// # use std::sync::Arc;
- /// # use bytes::Bytes;
- /// # use arrow_array::{Int32Array, RecordBatch};
- /// # use arrow_schema::{DataType, Field, Schema};
- /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
- /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
- /// # use tempfile::tempfile;
- /// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
- /// # let mut file = tempfile().unwrap();
- /// # let schema = Arc::new(Schema::new(vec![Field::new("i32",
DataType::Int32, false)]));
- /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(),
None).unwrap();
- /// # let batch = RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
- /// # writer.write(&batch).unwrap();
- /// # writer.close().unwrap();
- /// // Open async file containing parquet data
- /// let mut file = tokio::fs::File::from_std(file);
- /// // construct the reader
- /// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
- /// .await.unwrap().build().unwrap();
- /// // Read batche
- /// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // as before, use tokio::fs::File to read data using an async I/O.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async source, in this case
we set the batch size
+ /// // to 3 that produces 3 rows at a time.
Review Comment:
```suggestion
/// // to 3 which produces 3 rows at a time.
```
? 🤷
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async`
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
///
-/// In particular, this handles reading the parquet file metadata, allowing
consumers
+/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+ /// specified source.
///
/// # Example
+ /// ```
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() {
+ /// #
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // use tokio::fs::File to read data using an async I/O. This can be
replaced with
+ /// // other async I/O reader such as a reader from an object store.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async souce
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap();
+ /// // Building the stream opens the parquet file (reads metadata, etc)
and returns
+ /// // a stream that can be used to incrementally read the data in batches
+ /// let stream = builder.build().unwrap();
+ /// // in this example, we collect the stream into a Vec<RecordBatch>
+ /// // but real applications would likely process the batches as they are
read
+ /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+ /// // demonstrate the results are as expected
+ /// assert_batches_eq(
+ /// &results,
+ /// &[
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| id | bool_col | tinyint_col | smallint_col | int_col |
bigint_col | float_col | double_col | date_string_col | string_col |
timestamp_col |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| 4 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30332f30312f3039 | 30 |
2009-03-01T00:00:00 |",
+ /// "| 5 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30332f30312f3039 | 31 |
2009-03-01T00:01:00 |",
+ /// "| 6 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30342f30312f3039 | 30 |
2009-04-01T00:00:00 |",
+ /// "| 7 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30342f30312f3039 | 31 |
2009-04-01T00:01:00 |",
+ /// "| 2 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30322f30312f3039 | 30 |
2009-02-01T00:00:00 |",
+ /// "| 3 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30322f30312f3039 | 31 |
2009-02-01T00:01:00 |",
+ /// "| 0 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30312f30312f3039 | 30 |
2009-01-01T00:00:00 |",
+ /// "| 1 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30312f30312f3039 | 31 |
2009-01-01T00:01:00 |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// ],
+ /// );
+ /// # }
+ /// ```
+ ///
+ /// # Example configuring options and reading metadata
+ ///
+ /// There are many options that control the behavior of the reader, such as
+ /// `with_batch_size`, `with_projection`, `with_filter`, etc...
///
/// ```
- /// # use std::fs::metadata;
- /// # use std::sync::Arc;
- /// # use bytes::Bytes;
- /// # use arrow_array::{Int32Array, RecordBatch};
- /// # use arrow_schema::{DataType, Field, Schema};
- /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
- /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
- /// # use tempfile::tempfile;
- /// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
- /// # let mut file = tempfile().unwrap();
- /// # let schema = Arc::new(Schema::new(vec![Field::new("i32",
DataType::Int32, false)]));
- /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(),
None).unwrap();
- /// # let batch = RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
- /// # writer.write(&batch).unwrap();
- /// # writer.close().unwrap();
- /// // Open async file containing parquet data
- /// let mut file = tokio::fs::File::from_std(file);
- /// // construct the reader
- /// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
- /// .await.unwrap().build().unwrap();
- /// // Read batche
- /// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // as before, use tokio::fs::File to read data using an async I/O.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async source, in this case
we set the batch size
+ /// // to 3 that produces 3 rows at a time.
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap()
+ /// .with_batch_size(3);
+ ///
+ /// // We can also read the metadata to inspect the schema and other
metadata
+ /// // before actually reading the data
+ /// let file_metadata = builder.metadata().file_metadata();
+ /// // Specify that we only want to read the 1st, 2nd, and 6th columns
+ /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2,
6]);
+ ///
+ /// let stream = builder.with_projection(mask).build().unwrap();
+ /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+ /// // print out the results
Review Comment:
```suggestion
/// // Print out the results
```
##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -249,53 +197,153 @@ impl ArrowReaderMetadata {
/// breaking the pre-existing ParquetRecordBatchStreamBuilder API
pub struct AsyncReader<T>(T);
-/// A builder used to construct a [`ParquetRecordBatchStream`] for `async`
reading of a parquet file
+/// A builder for reading parquet files from an `async` source as
[`ParquetRecordBatchStream`]
///
-/// In particular, this handles reading the parquet file metadata, allowing
consumers
+/// This builder handles reading the parquet file metadata, allowing consumers
/// to use this information to select what specific columns, row groups, etc...
/// they wish to be read by the resulting stream
///
+/// See examples on [`ParquetRecordBatchStreamBuilder::new`]
+///
/// See [`ArrowReaderBuilder`] for additional member functions
pub type ParquetRecordBatchStreamBuilder<T> =
ArrowReaderBuilder<AsyncReader<T>>;
impl<T: AsyncFileReader + Send + 'static> ParquetRecordBatchStreamBuilder<T> {
- /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided
parquet file
+ /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the
+ /// specified source.
///
/// # Example
+ /// ```
+ /// # #[tokio::main(flavor="current_thread")]
+ /// # async fn main() {
+ /// #
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // use tokio::fs::File to read data using an async I/O. This can be
replaced with
+ /// // other async I/O reader such as a reader from an object store.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async souce
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap();
+ /// // Building the stream opens the parquet file (reads metadata, etc)
and returns
+ /// // a stream that can be used to incrementally read the data in batches
+ /// let stream = builder.build().unwrap();
+ /// // in this example, we collect the stream into a Vec<RecordBatch>
+ /// // but real applications would likely process the batches as they are
read
+ /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+ /// // demonstrate the results are as expected
+ /// assert_batches_eq(
+ /// &results,
+ /// &[
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| id | bool_col | tinyint_col | smallint_col | int_col |
bigint_col | float_col | double_col | date_string_col | string_col |
timestamp_col |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// "| 4 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30332f30312f3039 | 30 |
2009-03-01T00:00:00 |",
+ /// "| 5 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30332f30312f3039 | 31 |
2009-03-01T00:01:00 |",
+ /// "| 6 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30342f30312f3039 | 30 |
2009-04-01T00:00:00 |",
+ /// "| 7 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30342f30312f3039 | 31 |
2009-04-01T00:01:00 |",
+ /// "| 2 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30322f30312f3039 | 30 |
2009-02-01T00:00:00 |",
+ /// "| 3 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30322f30312f3039 | 31 |
2009-02-01T00:01:00 |",
+ /// "| 0 | true | 0 | 0 | 0 | 0
| 0.0 | 0.0 | 30312f30312f3039 | 30 |
2009-01-01T00:00:00 |",
+ /// "| 1 | false | 1 | 1 | 1 | 10
| 1.1 | 10.1 | 30312f30312f3039 | 31 |
2009-01-01T00:01:00 |",
+ ///
"+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+",
+ /// ],
+ /// );
+ /// # }
+ /// ```
+ ///
+ /// # Example configuring options and reading metadata
+ ///
+ /// There are many options that control the behavior of the reader, such as
+ /// `with_batch_size`, `with_projection`, `with_filter`, etc...
///
/// ```
- /// # use std::fs::metadata;
- /// # use std::sync::Arc;
- /// # use bytes::Bytes;
- /// # use arrow_array::{Int32Array, RecordBatch};
- /// # use arrow_schema::{DataType, Field, Schema};
- /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata;
- /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
- /// # use tempfile::tempfile;
- /// # use futures::StreamExt;
/// # #[tokio::main(flavor="current_thread")]
/// # async fn main() {
/// #
- /// # let mut file = tempfile().unwrap();
- /// # let schema = Arc::new(Schema::new(vec![Field::new("i32",
DataType::Int32, false)]));
- /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(),
None).unwrap();
- /// # let batch = RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap();
- /// # writer.write(&batch).unwrap();
- /// # writer.close().unwrap();
- /// // Open async file containing parquet data
- /// let mut file = tokio::fs::File::from_std(file);
- /// // construct the reader
- /// let mut reader = ParquetRecordBatchStreamBuilder::new(file)
- /// .await.unwrap().build().unwrap();
- /// // Read batche
- /// let batch: RecordBatch = reader.next().await.unwrap().unwrap();
+ /// # use arrow_array::RecordBatch;
+ /// # use arrow::util::pretty::pretty_format_batches;
+ /// # use futures::TryStreamExt;
+ /// #
+ /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder,
ProjectionMask};
+ /// #
+ /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines:
&[&str]) {
+ /// # let formatted =
pretty_format_batches(batches).unwrap().to_string();
+ /// # let actual_lines: Vec<_> = formatted.trim().lines().collect();
+ /// # assert_eq!(
+ /// # &actual_lines, expected_lines,
+ /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
+ /// # expected_lines, actual_lines
+ /// # );
+ /// # }
+ /// #
+ /// # let testdata = arrow::util::test_util::parquet_test_data();
+ /// # let path = format!("{}/alltypes_plain.parquet", testdata);
+ /// // as before, use tokio::fs::File to read data using an async I/O.
+ /// let file = tokio::fs::File::open(path).await.unwrap();
+ ///
+ /// // Configure options for reading from the async source, in this case
we set the batch size
+ /// // to 3 that produces 3 rows at a time.
+ /// let builder = ParquetRecordBatchStreamBuilder::new(file)
+ /// .await
+ /// .unwrap()
+ /// .with_batch_size(3);
+ ///
+ /// // We can also read the metadata to inspect the schema and other
metadata
+ /// // before actually reading the data
+ /// let file_metadata = builder.metadata().file_metadata();
+ /// // Specify that we only want to read the 1st, 2nd, and 6th columns
+ /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2,
6]);
+ ///
+ /// let stream = builder.with_projection(mask).build().unwrap();
+ /// let results = stream.try_collect::<Vec<_>>().await.unwrap();
+ /// // print out the results
+ /// assert_batches_eq(
+ /// &results,
+ /// &[
+ /// "+----------+-------------+-----------+",
+ /// "| bool_col | tinyint_col | float_col |",
+ /// "+----------+-------------+-----------+",
+ /// "| true | 0 | 0.0 |",
+ /// "| false | 1 | 1.1 |",
+ /// "| true | 0 | 0.0 |",
+ /// "| false | 1 | 1.1 |",
+ /// "| true | 0 | 0.0 |",
+ /// "| false | 1 | 1.1 |",
+ /// "| true | 0 | 0.0 |",
+ /// "| false | 1 | 1.1 |",
+ /// "+----------+-------------+-----------+",
+ /// ],
+ /// );
+ ///
+ /// // The results has 8 rows, so since we set the batch size to 3, we
expect
+ /// // 3 batches with 3 rows each and the last batch with 2 rows.
Review Comment:
```suggestion
/// // 3 batches, two with 3 rows each and the last batch with 2 rows.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]