This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 8745c3560b Move `ValueIter` into own module, and add public
`record_count` function (#9557)
8745c3560b is described below
commit 8745c3560ba6b688e3cb8e1599e4da82b4168be4
Author: Alexander Rafferty <[email protected]>
AuthorDate: Thu Mar 19 06:18:33 2026 +1100
Move `ValueIter` into own module, and add public `record_count` function
(#9557)
# Which issue does this PR close?
Another smaller PR extracted from #9494.
# Rationale for this change
I've moved `ValueIter` into its own module because it's already
self-contained, and because that will make it easier to review the
changes I have made to `arrow-json/src/reader/schema.rs`.
I've also added a public `record_count` function to `ValueIter` - which
can be used to simplify consuming code in Datafusion which is currently
tracking it separately.
# What changes are included in this PR?
* Moved `ValueIter` into own module
* Added `record_count` method to `ValueIter`
# Are these changes tested?
Yes.
# Are there any user-facing changes?
Addition of one new public method, `ValueIter::record_count`.
---
arrow-json/src/reader/mod.rs | 2 +
arrow-json/src/reader/schema.rs | 80 +---------------------------
arrow-json/src/reader/value_iter.rs | 103 ++++++++++++++++++++++++++++++++++++
3 files changed, 107 insertions(+), 78 deletions(-)
diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs
index 786cf9212d..04271368a4 100644
--- a/arrow-json/src/reader/mod.rs
+++ b/arrow-json/src/reader/mod.rs
@@ -150,6 +150,7 @@ use arrow_array::{RecordBatch, RecordBatchReader,
StructArray, downcast_integer,
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, FieldRef, Schema, SchemaRef,
TimeUnit};
pub use schema::*;
+pub use value_iter::ValueIter;
use crate::reader::boolean_array::BooleanArrayDecoder;
use crate::reader::decimal_array::DecimalArrayDecoder;
@@ -179,6 +180,7 @@ mod string_view_array;
mod struct_array;
mod tape;
mod timestamp_array;
+mod value_iter;
/// A builder for [`Reader`] and [`Decoder`]
pub struct ReaderBuilder {
diff --git a/arrow-json/src/reader/schema.rs b/arrow-json/src/reader/schema.rs
index fb7d93a85e..524e6b2aa5 100644
--- a/arrow-json/src/reader/schema.rs
+++ b/arrow-json/src/reader/schema.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use super::ValueIter;
use arrow_schema::{ArrowError, DataType, Field, Fields, Schema};
use indexmap::map::IndexMap as HashMap;
use indexmap::set::IndexSet as HashSet;
@@ -127,83 +128,6 @@ fn generate_schema(spec: HashMap<String, InferredType>) ->
Result<Schema, ArrowE
Ok(Schema::new(generate_fields(&spec)?))
}
-/// JSON file reader that produces a serde_json::Value iterator from a Read
trait
-///
-/// # Example
-///
-/// ```
-/// use std::fs::File;
-/// use std::io::BufReader;
-/// use arrow_json::reader::ValueIter;
-///
-/// let mut reader =
-/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
-/// let mut value_reader = ValueIter::new(&mut reader, None);
-/// for value in value_reader {
-/// println!("JSON value: {}", value.unwrap());
-/// }
-/// ```
-#[derive(Debug)]
-pub struct ValueIter<R: BufRead> {
- reader: R,
- max_read_records: Option<usize>,
- record_count: usize,
- // reuse line buffer to avoid allocation on each record
- line_buf: String,
-}
-
-impl<R: BufRead> ValueIter<R> {
- /// Creates a new `ValueIter`
- pub fn new(reader: R, max_read_records: Option<usize>) -> Self {
- Self {
- reader,
- max_read_records,
- record_count: 0,
- line_buf: String::new(),
- }
- }
-}
-
-impl<R: BufRead> Iterator for ValueIter<R> {
- type Item = Result<Value, ArrowError>;
-
- fn next(&mut self) -> Option<Self::Item> {
- if let Some(max) = self.max_read_records {
- if self.record_count >= max {
- return None;
- }
- }
-
- loop {
- self.line_buf.truncate(0);
- match self.reader.read_line(&mut self.line_buf) {
- Ok(0) => {
- // read_line returns 0 when stream reached EOF
- return None;
- }
- Err(e) => {
- return Some(Err(ArrowError::JsonError(format!(
- "Failed to read JSON record: {e}"
- ))));
- }
- _ => {
- let trimmed_s = self.line_buf.trim();
- if trimmed_s.is_empty() {
- // ignore empty lines
- continue;
- }
-
- self.record_count += 1;
- return Some(
- serde_json::from_str(trimmed_s)
- .map_err(|e| ArrowError::JsonError(format!("Not
valid JSON: {e}"))),
- );
- }
- }
- }
- }
-}
-
/// Infer the fields of a JSON file by reading the first n records of the
file, with
/// `max_read_records` controlling the maximum number of records to read.
///
@@ -282,7 +206,7 @@ pub fn infer_json_schema<R: BufRead>(
) -> Result<(Schema, usize), ArrowError> {
let mut values = ValueIter::new(reader, max_read_records);
let schema = infer_json_schema_from_iterator(&mut values)?;
- Ok((schema, values.record_count))
+ Ok((schema, values.record_count()))
}
fn set_object_scalar_field_type(
diff --git a/arrow-json/src/reader/value_iter.rs
b/arrow-json/src/reader/value_iter.rs
new file mode 100644
index 0000000000..f70b893f52
--- /dev/null
+++ b/arrow-json/src/reader/value_iter.rs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io::BufRead;
+
+use arrow_schema::ArrowError;
+use serde_json::Value;
+
+/// JSON file reader that produces a serde_json::Value iterator from a Read
trait
+///
+/// # Example
+///
+/// ```
+/// use std::fs::File;
+/// use std::io::BufReader;
+/// use arrow_json::reader::ValueIter;
+///
+/// let mut reader =
+/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
+/// let mut value_reader = ValueIter::new(&mut reader, None);
+/// for value in value_reader {
+/// println!("JSON value: {}", value.unwrap());
+/// }
+/// ```
+#[derive(Debug)]
+pub struct ValueIter<R: BufRead> {
+ reader: R,
+ max_read_records: Option<usize>,
+ record_count: usize,
+ // reuse line buffer to avoid allocation on each record
+ line_buf: String,
+}
+
+impl<R: BufRead> ValueIter<R> {
+ /// Creates a new `ValueIter`
+ pub fn new(reader: R, max_read_records: Option<usize>) -> Self {
+ Self {
+ reader,
+ max_read_records,
+ record_count: 0,
+ line_buf: String::new(),
+ }
+ }
+
+ /// Returns the number of records this iterator has consumed
+ pub fn record_count(&self) -> usize {
+ self.record_count
+ }
+}
+
+impl<R: BufRead> Iterator for ValueIter<R> {
+ type Item = Result<Value, ArrowError>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ if let Some(max) = self.max_read_records {
+ if self.record_count >= max {
+ return None;
+ }
+ }
+
+ loop {
+ self.line_buf.truncate(0);
+ match self.reader.read_line(&mut self.line_buf) {
+ Ok(0) => {
+ // read_line returns 0 when stream reached EOF
+ return None;
+ }
+ Err(e) => {
+ return Some(Err(ArrowError::JsonError(format!(
+ "Failed to read JSON record: {e}"
+ ))));
+ }
+ _ => {
+ let trimmed_s = self.line_buf.trim();
+ if trimmed_s.is_empty() {
+ // ignore empty lines
+ continue;
+ }
+
+ self.record_count += 1;
+ return Some(
+ serde_json::from_str(trimmed_s)
+ .map_err(|e| ArrowError::JsonError(format!("Not
valid JSON: {e}"))),
+ );
+ }
+ }
+ }
+ }
+}