This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 9fa8125fb Cleanup CSV schema inference (#4129) (#4130) (#4133)
9fa8125fb is described below
commit 9fa8125fbe14a3a85b4995617945bda51ee3b055
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Apr 27 08:33:12 2023 -0400
Cleanup CSV schema inference (#4129) (#4130) (#4133)
* Cleanup CSV schema inference (#4129) (#4130)
* Update tests
* Update parquet-fromcsv
---
arrow-csv/src/reader/mod.rs | 979 +++++++++++++-------------------
arrow-csv/src/writer.rs | 18 +-
arrow/benches/csv_reader.rs | 3 +-
arrow/examples/read_csv.rs | 6 +-
arrow/examples/read_csv_infer_schema.rs | 13 +-
parquet/src/bin/parquet-fromcsv.rs | 3 +-
6 files changed, 405 insertions(+), 617 deletions(-)
diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs
index 5bfcbc645..74294f42e 100644
--- a/arrow-csv/src/reader/mod.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -26,7 +26,7 @@
//!
//! ```
//! # use arrow_schema::*;
-//! # use arrow_csv::Reader;
+//! # use arrow_csv::{Reader, ReaderBuilder};
//! # use std::fs::File;
//! # use std::sync::Arc;
//!
@@ -38,7 +38,7 @@
//!
//! let file = File::open("test/data/uk_cities.csv").unwrap();
//!
-//! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None,
None, None);
+//! let mut csv = ReaderBuilder::new(Arc::new(schema)).build(file).unwrap();
//! let batch = csv.next().unwrap().unwrap();
//! ```
//!
@@ -131,8 +131,9 @@ use arrow_array::*;
use arrow_cast::parse::{parse_decimal, string_to_datetime, Parser};
use arrow_schema::*;
use chrono::{TimeZone, Utc};
+use csv::StringRecord;
use lazy_static::lazy_static;
-use regex::{Regex, RegexSet};
+use regex::RegexSet;
use std::fmt;
use std::fs::File;
use std::io::{BufRead, BufReader as StdBufReader, Read, Seek, SeekFrom};
@@ -141,7 +142,6 @@ use std::sync::Arc;
use crate::map_csv_error;
use crate::reader::records::{RecordDecoder, StringRecords};
use arrow_array::timezone::Tz;
-use csv::StringRecord;
lazy_static! {
/// Order should match [`InferredDataType`]
@@ -194,32 +194,150 @@ impl InferredDataType {
}
/// Updates the [`InferredDataType`] with the given string
- fn update(&mut self, string: &str, datetime_re: Option<&Regex>) {
+ fn update(&mut self, string: &str) {
self.packed |= if string.starts_with('"') {
1 << 8 // Utf8
} else if let Some(m) = REGEX_SET.matches(string).into_iter().next() {
1 << m
} else {
- match datetime_re {
- // Timestamp(Nanosecond)
- Some(d) if d.is_match(string) => 1 << 7,
- _ => 1 << 8, // Utf8
- }
+ 1 << 8 // Utf8
}
}
}
-/// This is a collection of options for csv reader when the builder pattern
cannot be used
-/// and the parameters need to be passed around
-#[derive(Debug, Default, Clone)]
-struct ReaderOptions {
+/// The format specification for the CSV file
+#[derive(Debug, Clone, Default)]
+pub struct Format {
has_header: bool,
delimiter: Option<u8>,
escape: Option<u8>,
quote: Option<u8>,
terminator: Option<u8>,
- max_read_records: Option<usize>,
- datetime_re: Option<Regex>,
+}
+
+impl Format {
+ pub fn with_header(mut self, has_header: bool) -> Self {
+ self.has_header = has_header;
+ self
+ }
+
+ pub fn with_delimiter(mut self, delimiter: u8) -> Self {
+ self.delimiter = Some(delimiter);
+ self
+ }
+
+ pub fn with_escape(mut self, escape: u8) -> Self {
+ self.escape = Some(escape);
+ self
+ }
+
+ pub fn with_quote(mut self, quote: u8) -> Self {
+ self.quote = Some(quote);
+ self
+ }
+
+ pub fn with_terminator(mut self, terminator: u8) -> Self {
+ self.terminator = Some(terminator);
+ self
+ }
+
+ /// Infer schema of CSV records from the provided `reader`
+ ///
+ /// If `max_records` is `None`, all records will be read, otherwise up to
`max_records`
+ /// records are read to infer the schema
+ ///
+ /// Returns inferred schema and number of records read
+ pub fn infer_schema<R: Read>(
+ &self,
+ reader: R,
+ max_records: Option<usize>,
+ ) -> Result<(Schema, usize), ArrowError> {
+ let mut csv_reader = self.build_reader(reader);
+
+ // get or create header names
+ // when has_header is false, creates default column names with column_
prefix
+ let headers: Vec<String> = if self.has_header {
+ let headers =
&csv_reader.headers().map_err(map_csv_error)?.clone();
+ headers.iter().map(|s| s.to_string()).collect()
+ } else {
+ let first_record_count =
&csv_reader.headers().map_err(map_csv_error)?.len();
+ (0..*first_record_count)
+ .map(|i| format!("column_{}", i + 1))
+ .collect()
+ };
+
+ let header_length = headers.len();
+ // keep track of inferred field types
+ let mut column_types: Vec<InferredDataType> =
+ vec![Default::default(); header_length];
+
+ let mut records_count = 0;
+
+ let mut record = StringRecord::new();
+ let max_records = max_records.unwrap_or(usize::MAX);
+ while records_count < max_records {
+ if !csv_reader.read_record(&mut record).map_err(map_csv_error)? {
+ break;
+ }
+ records_count += 1;
+
+ // Note since we may be looking at a sample of the data, we make
the safe assumption that
+ // they could be nullable
+ for (i, column_type) in
+ column_types.iter_mut().enumerate().take(header_length)
+ {
+ if let Some(string) = record.get(i) {
+ if !string.is_empty() {
+ column_type.update(string)
+ }
+ }
+ }
+ }
+
+ // build schema from inference results
+ let fields: Fields = column_types
+ .iter()
+ .zip(&headers)
+ .map(|(inferred, field_name)| Field::new(field_name,
inferred.get(), true))
+ .collect();
+
+ Ok((Schema::new(fields), records_count))
+ }
+
+ /// Build a [`csv::Reader`] for this [`Format`]
+ fn build_reader<R: Read>(&self, reader: R) -> csv::Reader<R> {
+ let mut builder = csv::ReaderBuilder::new();
+ builder.has_headers(self.has_header);
+
+ if let Some(c) = self.delimiter {
+ builder.delimiter(c);
+ }
+ builder.escape(self.escape);
+ if let Some(c) = self.quote {
+ builder.quote(c);
+ }
+ if let Some(t) = self.terminator {
+ builder.terminator(csv::Terminator::Any(t));
+ }
+ builder.from_reader(reader)
+ }
+
+ /// Build a [`csv_core::Reader`] for this [`Format`]
+ fn build_parser(&self) -> csv_core::Reader {
+ let mut builder = csv_core::ReaderBuilder::new();
+ builder.escape(self.escape);
+
+ if let Some(c) = self.delimiter {
+ builder.delimiter(c);
+ }
+ if let Some(c) = self.quote {
+ builder.quote(c);
+ }
+ if let Some(t) = self.terminator {
+ builder.terminator(csv_core::Terminator::Any(t));
+ }
+ builder.build()
+ }
}
/// Infer the schema of a CSV file by reading through the first n records of
the file,
@@ -231,34 +349,19 @@ struct ReaderOptions {
/// reader cursor offset.
///
/// The inferred schema will always have each field set as nullable.
+#[deprecated(note = "Use Format::infer_schema")]
+#[allow(deprecated)]
pub fn infer_file_schema<R: Read + Seek>(
- reader: R,
+ mut reader: R,
delimiter: u8,
max_read_records: Option<usize>,
has_header: bool,
-) -> Result<(Schema, usize), ArrowError> {
- let roptions = ReaderOptions {
- delimiter: Some(delimiter),
- max_read_records,
- has_header,
- ..Default::default()
- };
-
- infer_file_schema_with_csv_options(reader, roptions)
-}
-
-fn infer_file_schema_with_csv_options<R: Read + Seek>(
- mut reader: R,
- roptions: ReaderOptions,
) -> Result<(Schema, usize), ArrowError> {
let saved_offset = reader.stream_position()?;
-
- let (schema, records_count) =
- infer_reader_schema_with_csv_options(&mut reader, roptions)?;
+ let r = infer_reader_schema(&mut reader, delimiter, max_read_records,
has_header)?;
// return the reader seek back to the start
reader.seek(SeekFrom::Start(saved_offset))?;
-
- Ok((schema, records_count))
+ Ok(r)
}
/// Infer schema of CSV records provided by struct that implements `Read`
trait.
@@ -267,104 +370,19 @@ fn infer_file_schema_with_csv_options<R: Read + Seek>(
/// not set, all records are read to infer the schema.
///
/// Return inferred schema and number of records used for inference.
+#[deprecated(note = "Use Format::infer_schema")]
pub fn infer_reader_schema<R: Read>(
reader: R,
delimiter: u8,
max_read_records: Option<usize>,
has_header: bool,
) -> Result<(Schema, usize), ArrowError> {
- let roptions = ReaderOptions {
+ let format = Format {
delimiter: Some(delimiter),
- max_read_records,
has_header,
..Default::default()
};
- infer_reader_schema_with_csv_options(reader, roptions)
-}
-
-/// Creates a `csv::Reader`
-fn build_csv_reader<R: Read>(
- reader: R,
- has_header: bool,
- delimiter: Option<u8>,
- escape: Option<u8>,
- quote: Option<u8>,
- terminator: Option<u8>,
-) -> csv::Reader<R> {
- let mut reader_builder = csv::ReaderBuilder::new();
- reader_builder.has_headers(has_header);
-
- if let Some(c) = delimiter {
- reader_builder.delimiter(c);
- }
- reader_builder.escape(escape);
- if let Some(c) = quote {
- reader_builder.quote(c);
- }
- if let Some(t) = terminator {
- reader_builder.terminator(csv::Terminator::Any(t));
- }
- reader_builder.from_reader(reader)
-}
-
-fn infer_reader_schema_with_csv_options<R: Read>(
- reader: R,
- roptions: ReaderOptions,
-) -> Result<(Schema, usize), ArrowError> {
- let mut csv_reader = build_csv_reader(
- reader,
- roptions.has_header,
- roptions.delimiter,
- roptions.escape,
- roptions.quote,
- roptions.terminator,
- );
-
- // get or create header names
- // when has_header is false, creates default column names with column_
prefix
- let headers: Vec<String> = if roptions.has_header {
- let headers = &csv_reader.headers().map_err(map_csv_error)?.clone();
- headers.iter().map(|s| s.to_string()).collect()
- } else {
- let first_record_count =
&csv_reader.headers().map_err(map_csv_error)?.len();
- (0..*first_record_count)
- .map(|i| format!("column_{}", i + 1))
- .collect()
- };
-
- let header_length = headers.len();
- // keep track of inferred field types
- let mut column_types: Vec<InferredDataType> = vec![Default::default();
header_length];
-
- let mut records_count = 0;
-
- let mut record = StringRecord::new();
- let max_records = roptions.max_read_records.unwrap_or(usize::MAX);
- while records_count < max_records {
- if !csv_reader.read_record(&mut record).map_err(map_csv_error)? {
- break;
- }
- records_count += 1;
-
- // Note since we may be looking at a sample of the data, we make the
safe assumption that
- // they could be nullable
- for (i, column_type) in
column_types.iter_mut().enumerate().take(header_length) {
- if let Some(string) = record.get(i) {
- if !string.is_empty() {
- column_type.update(string, roptions.datetime_re.as_ref())
- }
- }
- }
- }
-
- // build schema from inference results
- let fields: Fields = column_types
- .iter()
- .zip(&headers)
- .map(|(inferred, field_name)| Field::new(field_name, inferred.get(),
true))
- .collect();
-
- Ok((Schema::new(fields), records_count))
+ format.infer_schema(reader, max_read_records)
}
/// Infer schema from a list of CSV files by reading through first n records
@@ -381,14 +399,15 @@ pub fn infer_schema_from_files(
) -> Result<Schema, ArrowError> {
let mut schemas = vec![];
let mut records_to_read = max_read_records.unwrap_or(usize::MAX);
+ let format = Format {
+ delimiter: Some(delimiter),
+ has_header,
+ ..Default::default()
+ };
for fname in files.iter() {
- let (schema, records_read) = infer_file_schema(
- &mut File::open(fname)?,
- delimiter,
- Some(records_to_read),
- has_header,
- )?;
+ let f = File::open(fname)?;
+ let (schema, records_read) = format.infer_schema(f,
Some(records_to_read))?;
if records_read == 0 {
continue;
}
@@ -429,46 +448,6 @@ where
}
impl<R: Read> Reader<R> {
- /// Create a new CsvReader from any value that implements the `Read` trait.
- ///
- /// If reading a `File` or an input that supports `std::io::Read` and
`std::io::Seek`;
- /// you can customise the Reader, such as to enable schema inference, use
- /// `ReaderBuilder`.
- #[allow(clippy::too_many_arguments)]
- pub fn new(
- reader: R,
- schema: SchemaRef,
- has_header: bool,
- delimiter: Option<u8>,
- batch_size: usize,
- bounds: Bounds,
- projection: Option<Vec<usize>>,
- datetime_format: Option<String>,
- ) -> Self {
- let mut builder = ReaderBuilder::new()
- .has_header(has_header)
- .with_batch_size(batch_size)
- .with_schema(schema);
-
- if let Some(delimiter) = delimiter {
- builder = builder.with_delimiter(delimiter);
- }
- if let Some((start, end)) = bounds {
- builder = builder.with_bounds(start, end);
- }
- if let Some(projection) = projection {
- builder = builder.with_projection(projection)
- }
- if let Some(format) = datetime_format {
- builder = builder.with_datetime_format(format)
- }
-
- Self {
- decoder: builder.build_decoder(),
- reader: StdBufReader::new(reader),
- }
- }
-
/// Returns the schema of the reader, useful for getting the schema
without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
@@ -481,34 +460,6 @@ impl<R: Read> Reader<R> {
None => self.decoder.schema.clone(),
}
}
-
- /// Create a new CsvReader from a Reader
- ///
- /// This constructor allows you more flexibility in what records are
processed by the
- /// csv reader.
- #[allow(clippy::too_many_arguments)]
- #[deprecated(note = "Use Reader::new or ReaderBuilder")]
- pub fn from_reader(
- reader: R,
- schema: SchemaRef,
- has_header: bool,
- delimiter: Option<u8>,
- batch_size: usize,
- bounds: Bounds,
- projection: Option<Vec<usize>>,
- datetime_format: Option<String>,
- ) -> Self {
- Self::new(
- reader,
- schema,
- has_header,
- delimiter,
- batch_size,
- bounds,
- projection,
- datetime_format,
- )
- }
}
impl<R: BufRead> BufReader<R> {
@@ -558,8 +509,7 @@ impl<R: BufRead> Iterator for BufReader<R> {
/// schema: SchemaRef,
/// batch_size: usize,
/// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>,
ArrowError> {
-/// let mut decoder = ReaderBuilder::new()
-/// .with_schema(schema)
+/// let mut decoder = ReaderBuilder::new(schema)
/// .with_batch_size(batch_size)
/// .build_decoder();
///
@@ -601,11 +551,6 @@ pub struct Decoder {
/// A decoder for [`StringRecords`]
record_decoder: RecordDecoder,
-
- /// datetime format used to parse datetime values, (format understood by
chrono)
- ///
- /// For format refer to [chrono
docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html)
- datetime_format: Option<String>,
}
impl Decoder {
@@ -652,7 +597,6 @@ impl Decoder {
Some(self.schema.metadata.clone()),
self.projection.as_ref(),
self.line_number,
- self.datetime_format.as_deref(),
)?;
self.line_number += rows.len();
Ok(Some(batch))
@@ -671,7 +615,6 @@ fn parse(
metadata: Option<std::collections::HashMap<String, String>>,
projection: Option<&Vec<usize>>,
line_number: usize,
- datetime_format: Option<&str>,
) -> Result<RecordBatch, ArrowError> {
let projection: Vec<usize> = match projection {
Some(v) => v.clone(),
@@ -703,63 +646,52 @@ fn parse(
*scale,
)
}
- DataType::Int8 => {
- build_primitive_array::<Int8Type>(line_number, rows, i,
None)
- }
+ DataType::Int8 =>
build_primitive_array::<Int8Type>(line_number, rows, i),
DataType::Int16 => {
- build_primitive_array::<Int16Type>(line_number, rows, i,
None)
+ build_primitive_array::<Int16Type>(line_number, rows, i)
}
DataType::Int32 => {
- build_primitive_array::<Int32Type>(line_number, rows, i,
None)
+ build_primitive_array::<Int32Type>(line_number, rows, i)
}
DataType::Int64 => {
- build_primitive_array::<Int64Type>(line_number, rows, i,
None)
+ build_primitive_array::<Int64Type>(line_number, rows, i)
}
DataType::UInt8 => {
- build_primitive_array::<UInt8Type>(line_number, rows, i,
None)
+ build_primitive_array::<UInt8Type>(line_number, rows, i)
}
DataType::UInt16 => {
- build_primitive_array::<UInt16Type>(line_number, rows, i,
None)
+ build_primitive_array::<UInt16Type>(line_number, rows, i)
}
DataType::UInt32 => {
- build_primitive_array::<UInt32Type>(line_number, rows, i,
None)
+ build_primitive_array::<UInt32Type>(line_number, rows, i)
}
DataType::UInt64 => {
- build_primitive_array::<UInt64Type>(line_number, rows, i,
None)
+ build_primitive_array::<UInt64Type>(line_number, rows, i)
}
DataType::Float32 => {
- build_primitive_array::<Float32Type>(line_number, rows, i,
None)
+ build_primitive_array::<Float32Type>(line_number, rows, i)
}
DataType::Float64 => {
- build_primitive_array::<Float64Type>(line_number, rows, i,
None)
+ build_primitive_array::<Float64Type>(line_number, rows, i)
}
DataType::Date32 => {
- build_primitive_array::<Date32Type>(line_number, rows, i,
None)
+ build_primitive_array::<Date32Type>(line_number, rows, i)
+ }
+ DataType::Date64 => {
+ build_primitive_array::<Date64Type>(line_number, rows, i)
}
- DataType::Date64 => build_primitive_array::<Date64Type>(
- line_number,
- rows,
- i,
- datetime_format,
- ),
DataType::Time32(TimeUnit::Second) => {
- build_primitive_array::<Time32SecondType>(line_number,
rows, i, None)
+ build_primitive_array::<Time32SecondType>(line_number,
rows, i)
+ }
+ DataType::Time32(TimeUnit::Millisecond) => {
+
build_primitive_array::<Time32MillisecondType>(line_number, rows, i)
+ }
+ DataType::Time64(TimeUnit::Microsecond) => {
+
build_primitive_array::<Time64MicrosecondType>(line_number, rows, i)
+ }
+ DataType::Time64(TimeUnit::Nanosecond) => {
+ build_primitive_array::<Time64NanosecondType>(line_number,
rows, i)
}
- DataType::Time32(TimeUnit::Millisecond) =>
build_primitive_array::<
- Time32MillisecondType,
- >(
- line_number, rows, i, None
- ),
- DataType::Time64(TimeUnit::Microsecond) =>
build_primitive_array::<
- Time64MicrosecondType,
- >(
- line_number, rows, i, None
- ),
- DataType::Time64(TimeUnit::Nanosecond) =>
build_primitive_array::<
- Time64NanosecondType,
- >(
- line_number, rows, i, None
- ),
DataType::Timestamp(TimeUnit::Second, tz) => {
build_timestamp_array::<TimestampSecondType>(
line_number,
@@ -871,13 +803,6 @@ fn parse(
)
})
}
-fn parse_item<T: Parser>(string: &str) -> Option<T::Native> {
- T::parse(string)
-}
-
-fn parse_formatted<T: Parser>(string: &str, format: &str) -> Option<T::Native>
{
- T::parse_formatted(string, format)
-}
fn parse_bool(string: &str) -> Option<bool> {
if string.eq_ignore_ascii_case("false") {
@@ -928,7 +853,6 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
line_number: usize,
rows: &StringRecords<'_>,
col_idx: usize,
- format: Option<&str>,
) -> Result<ArrayRef, ArrowError> {
rows.iter()
.enumerate()
@@ -938,11 +862,7 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
return Ok(None);
}
- let parsed = match format {
- Some(format) => parse_formatted::<T>(s, format),
- _ => parse_item::<T>(s),
- };
- match parsed {
+ match T::parse(s) {
Some(e) => Ok(Some(e)),
None => Err(ArrowError::ParseError(format!(
// TODO: we should surface the underlying error here.
@@ -1037,28 +957,10 @@ fn build_boolean_array(
/// CSV file reader builder
#[derive(Debug)]
pub struct ReaderBuilder {
- /// Optional schema for the CSV file
- ///
- /// If the schema is not supplied, the reader will try to infer the schema
- /// based on the CSV structure.
- schema: Option<SchemaRef>,
- /// Whether the file has headers or not
- ///
- /// If schema inference is run on a file with no headers, default column
names
- /// are created.
- has_header: bool,
- /// An optional column delimiter. Defaults to `b','`
- delimiter: Option<u8>,
- /// An optional escape character. Defaults None
- escape: Option<u8>,
- /// An optional quote character. Defaults b'\"'
- quote: Option<u8>,
- /// An optional record terminator. Defaults CRLF
- terminator: Option<u8>,
- /// Optional maximum number of records to read during schema inference
- ///
- /// If a number is not provided, all the records are read.
- max_records: Option<usize>,
+ /// Schema of the CSV file
+ schema: SchemaRef,
+ /// Format of the CSV file
+ format: Format,
/// Batch size (number of records to load each time)
///
/// The default batch size when using the `ReaderBuilder` is 1024 records
@@ -1067,29 +969,6 @@ pub struct ReaderBuilder {
bounds: Bounds,
/// Optional projection for which columns to load (zero-based column
indices)
projection: Option<Vec<usize>>,
- /// DateTime format to be used while trying to infer datetime format
- datetime_re: Option<Regex>,
- /// DateTime format to be used while parsing datetime format
- datetime_format: Option<String>,
-}
-
-impl Default for ReaderBuilder {
- fn default() -> Self {
- Self {
- schema: None,
- has_header: false,
- delimiter: None,
- escape: None,
- quote: None,
- terminator: None,
- max_records: None,
- batch_size: 1024,
- bounds: None,
- projection: None,
- datetime_re: None,
- datetime_format: None,
- }
- }
}
impl ReaderBuilder {
@@ -1100,79 +979,60 @@ impl ReaderBuilder {
/// # Example
///
/// ```
- /// use arrow_csv::{Reader, ReaderBuilder};
- /// use std::fs::File;
- ///
- /// fn example() -> Reader<File> {
- /// let file =
File::open("test/data/uk_cities_with_headers.csv").unwrap();
- ///
- /// // create a builder, inferring the schema with the first 100
records
- /// let builder = ReaderBuilder::new().infer_schema(Some(100));
- ///
- /// let reader = builder.build(file).unwrap();
+ /// # use arrow_csv::{Reader, ReaderBuilder};
+ /// # use std::fs::File;
+ /// # use std::io::Seek;
+ /// # use std::sync::Arc;
+ /// # use arrow_csv::reader::Format;
+ /// #
+ /// let mut file =
File::open("test/data/uk_cities_with_headers.csv").unwrap();
+ /// // Infer the schema with the first 100 records
+ /// let (schema, _) = Format::default().infer_schema(&mut file,
Some(100)).unwrap();
+ /// file.rewind().unwrap();
///
- /// reader
- /// }
+ /// // create a builder
+ /// ReaderBuilder::new(Arc::new(schema)).build(file).unwrap();
/// ```
- pub fn new() -> ReaderBuilder {
- ReaderBuilder::default()
- }
-
- /// Set the CSV file's schema
- pub fn with_schema(mut self, schema: SchemaRef) -> Self {
- self.schema = Some(schema);
- self
+ pub fn new(schema: SchemaRef) -> ReaderBuilder {
+ Self {
+ schema,
+ format: Format::default(),
+ batch_size: 1024,
+ bounds: None,
+ projection: None,
+ }
}
/// Set whether the CSV file has headers
pub fn has_header(mut self, has_header: bool) -> Self {
- self.has_header = has_header;
+ self.format.has_header = has_header;
self
}
- /// Set the datetime regex used to parse the string to Date64Type
- /// this regex is used while inferring schema
- pub fn with_datetime_re(mut self, datetime_re: Regex) -> Self {
- self.datetime_re = Some(datetime_re);
- self
- }
-
- /// Set the datetime format used to parse the string to Date64Type
- /// this format is used while when the schema wants to parse Date64Type.
- ///
- /// For format refer to [chrono
docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html)
- ///
- pub fn with_datetime_format(mut self, datetime_format: String) -> Self {
- self.datetime_format = Some(datetime_format);
+ /// Overrides the [`Format`] of this [`ReaderBuilder]
+ pub fn with_format(mut self, format: Format) -> Self {
+ self.format = format;
self
}
/// Set the CSV file's column delimiter as a byte character
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
- self.delimiter = Some(delimiter);
+ self.format.delimiter = Some(delimiter);
self
}
pub fn with_escape(mut self, escape: u8) -> Self {
- self.escape = Some(escape);
+ self.format.escape = Some(escape);
self
}
pub fn with_quote(mut self, quote: u8) -> Self {
- self.quote = Some(quote);
+ self.format.quote = Some(quote);
self
}
pub fn with_terminator(mut self, terminator: u8) -> Self {
- self.terminator = Some(terminator);
- self
- }
-
- /// Set the CSV reader to infer the schema of the file
- pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
- // remove any schema that is set
- self.schema = None;
- self.max_records = max_records;
+ self.format.terminator = Some(terminator);
self
}
@@ -1199,32 +1059,15 @@ impl ReaderBuilder {
///
/// If `R: BufRead` consider using [`Self::build_buffered`] to avoid
unnecessary additional
/// buffering, as internally this method wraps `reader` in
[`std::io::BufReader`]
- pub fn build<R: Read + Seek>(self, reader: R) -> Result<Reader<R>,
ArrowError> {
+ pub fn build<R: Read>(self, reader: R) -> Result<Reader<R>, ArrowError> {
self.build_buffered(StdBufReader::new(reader))
}
/// Create a new `BufReader` from a buffered reader
- pub fn build_buffered<R: BufRead + Seek>(
- mut self,
- mut reader: R,
+ pub fn build_buffered<R: BufRead>(
+ self,
+ reader: R,
) -> Result<BufReader<R>, ArrowError> {
- // check if schema should be inferred
- if self.schema.is_none() {
- let delimiter = self.delimiter.unwrap_or(b',');
- let roptions = ReaderOptions {
- delimiter: Some(delimiter),
- max_read_records: self.max_records,
- has_header: self.has_header,
- escape: self.escape,
- quote: self.quote,
- terminator: self.terminator,
- datetime_re: self.datetime_re.take(),
- };
- let (inferred_schema, _) =
- infer_file_schema_with_csv_options(&mut reader, roptions)?;
- self.schema = Some(Arc::new(inferred_schema))
- }
-
Ok(BufReader {
reader,
decoder: self.build_decoder(),
@@ -1232,28 +1075,11 @@ impl ReaderBuilder {
}
/// Builds a decoder that can be used to decode CSV from an arbitrary byte
stream
- ///
- /// # Panics
- ///
- /// This method panics if no schema provided
pub fn build_decoder(self) -> Decoder {
- let schema = self.schema.expect("schema should be provided");
- let mut reader_builder = csv_core::ReaderBuilder::new();
- reader_builder.escape(self.escape);
-
- if let Some(c) = self.delimiter {
- reader_builder.delimiter(c);
- }
- if let Some(c) = self.quote {
- reader_builder.quote(c);
- }
- if let Some(t) = self.terminator {
- reader_builder.terminator(csv_core::Terminator::Any(t));
- }
- let delimiter = reader_builder.build();
- let record_decoder = RecordDecoder::new(delimiter,
schema.fields().len());
+ let delimiter = self.format.build_parser();
+ let record_decoder = RecordDecoder::new(delimiter,
self.schema.fields().len());
- let header = self.has_header as usize;
+ let header = self.format.has_header as usize;
let (start, end) = match self.bounds {
Some((start, end)) => (start + header, end + header),
@@ -1261,13 +1087,12 @@ impl ReaderBuilder {
};
Decoder {
- schema,
+ schema: self.schema,
to_skip: start,
record_decoder,
line_number: start,
end,
projection: self.projection,
- datetime_format: self.datetime_format,
batch_size: self.batch_size,
}
}
@@ -1284,74 +1109,46 @@ mod tests {
#[test]
fn test_csv() {
- for format in [None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())] {
- let schema = Schema::new(vec![
- Field::new("city", DataType::Utf8, false),
- Field::new("lat", DataType::Float64, false),
- Field::new("lng", DataType::Float64, false),
- ]);
-
- let file = File::open("test/data/uk_cities.csv").unwrap();
- let mut csv = Reader::new(
- file,
- Arc::new(schema.clone()),
- false,
- None,
- 1024,
- None,
- None,
- format,
- );
- assert_eq!(Arc::new(schema), csv.schema());
- let batch = csv.next().unwrap().unwrap();
- assert_eq!(37, batch.num_rows());
- assert_eq!(3, batch.num_columns());
-
- // access data from a primitive array
- let lat = batch
- .column(1)
- .as_any()
- .downcast_ref::<Float64Array>()
- .unwrap();
- assert_eq!(57.653484, lat.value(0));
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("city", DataType::Utf8, false),
+ Field::new("lat", DataType::Float64, false),
+ Field::new("lng", DataType::Float64, false),
+ ]));
- // access data from a string array (ListArray<u8>)
- let city = batch
- .column(0)
- .as_any()
- .downcast_ref::<StringArray>()
- .unwrap();
+ let file = File::open("test/data/uk_cities.csv").unwrap();
+ let mut csv = ReaderBuilder::new(schema.clone()).build(file).unwrap();
+ assert_eq!(schema, csv.schema());
+ let batch = csv.next().unwrap().unwrap();
+ assert_eq!(37, batch.num_rows());
+ assert_eq!(3, batch.num_columns());
- assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
- }
+ // access data from a primitive array
+ let lat = batch.column(1).as_primitive::<Float64Type>();
+ assert_eq!(57.653484, lat.value(0));
+
+ // access data from a string array (ListArray<u8>)
+ let city = batch.column(0).as_string::<i32>();
+
+ assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
}
#[test]
fn test_csv_schema_metadata() {
let mut metadata = std::collections::HashMap::new();
metadata.insert("foo".to_owned(), "bar".to_owned());
- let schema = Schema::new_with_metadata(
+ let schema = Arc::new(Schema::new_with_metadata(
vec![
Field::new("city", DataType::Utf8, false),
Field::new("lat", DataType::Float64, false),
Field::new("lng", DataType::Float64, false),
],
metadata.clone(),
- );
+ ));
let file = File::open("test/data/uk_cities.csv").unwrap();
- let mut csv = Reader::new(
- file,
- Arc::new(schema.clone()),
- false,
- None,
- 1024,
- None,
- None,
- None,
- );
- assert_eq!(Arc::new(schema), csv.schema());
+ let mut csv = ReaderBuilder::new(schema.clone()).build(file).unwrap();
+ assert_eq!(schema, csv.schema());
let batch = csv.next().unwrap().unwrap();
assert_eq!(37, batch.num_rows());
assert_eq!(3, batch.num_columns());
@@ -1361,16 +1158,15 @@ mod tests {
#[test]
fn test_csv_reader_with_decimal() {
- let schema = Schema::new(vec![
+ let schema = Arc::new(Schema::new(vec![
Field::new("city", DataType::Utf8, false),
Field::new("lat", DataType::Decimal128(38, 6), false),
Field::new("lng", DataType::Decimal256(76, 6), false),
- ]);
+ ]));
let file = File::open("test/data/decimal_test.csv").unwrap();
- let mut csv =
- Reader::new(file, Arc::new(schema), false, None, 1024, None, None,
None);
+ let mut csv = ReaderBuilder::new(schema).build(file).unwrap();
let batch = csv.next().unwrap().unwrap();
// access data from a primitive array
let lat = batch
@@ -1422,16 +1218,10 @@ mod tests {
let both_files = file_with_headers
.chain(Cursor::new("\n".to_string()))
.chain(file_without_headers);
- let mut csv = Reader::new(
- both_files,
- Arc::new(schema),
- true,
- None,
- 1024,
- None,
- None,
- None,
- );
+ let mut csv = ReaderBuilder::new(Arc::new(schema))
+ .has_header(true)
+ .build(both_files)
+ .unwrap();
let batch = csv.next().unwrap().unwrap();
assert_eq!(74, batch.num_rows());
assert_eq!(3, batch.num_columns());
@@ -1439,9 +1229,15 @@ mod tests {
#[test]
fn test_csv_with_schema_inference() {
- let file = File::open("test/data/uk_cities_with_headers.csv").unwrap();
+ let mut file =
File::open("test/data/uk_cities_with_headers.csv").unwrap();
- let builder = ReaderBuilder::new().has_header(true).infer_schema(None);
+ let (schema, _) = Format::default()
+ .with_header(true)
+ .infer_schema(&mut file, None)
+ .unwrap();
+
+ file.rewind().unwrap();
+ let builder = ReaderBuilder::new(Arc::new(schema)).has_header(true);
let mut csv = builder.build(file).unwrap();
let expected_schema = Schema::new(vec![
@@ -1474,11 +1270,12 @@ mod tests {
#[test]
fn test_csv_with_schema_inference_no_headers() {
- let file = File::open("test/data/uk_cities.csv").unwrap();
+ let mut file = File::open("test/data/uk_cities.csv").unwrap();
- let builder = ReaderBuilder::new().infer_schema(None);
+ let (schema, _) = Format::default().infer_schema(&mut file,
None).unwrap();
+ file.rewind().unwrap();
- let mut csv = builder.build(file).unwrap();
+ let mut csv =
ReaderBuilder::new(Arc::new(schema)).build(file).unwrap();
// csv field names should be 'column_{number}'
let schema = csv.schema();
@@ -1512,10 +1309,15 @@ mod tests {
#[test]
fn test_csv_builder_with_bounds() {
- let file = File::open("test/data/uk_cities.csv").unwrap();
+ let mut file = File::open("test/data/uk_cities.csv").unwrap();
// Set the bounds to the lines 0, 1 and 2.
- let mut csv = ReaderBuilder::new().with_bounds(0,
2).build(file).unwrap();
+ let (schema, _) = Format::default().infer_schema(&mut file,
None).unwrap();
+ file.rewind().unwrap();
+ let mut csv = ReaderBuilder::new(Arc::new(schema))
+ .with_bounds(0, 2)
+ .build(file)
+ .unwrap();
let batch = csv.next().unwrap().unwrap();
// access data from a string array (ListArray<u8>)
@@ -1536,24 +1338,19 @@ mod tests {
#[test]
fn test_csv_with_projection() {
- let schema = Schema::new(vec![
+ let schema = Arc::new(Schema::new(vec![
Field::new("city", DataType::Utf8, false),
Field::new("lat", DataType::Float64, false),
Field::new("lng", DataType::Float64, false),
- ]);
+ ]));
let file = File::open("test/data/uk_cities.csv").unwrap();
- let mut csv = Reader::new(
- file,
- Arc::new(schema),
- false,
- None,
- 1024,
- None,
- Some(vec![0, 1]),
- None,
- );
+ let mut csv = ReaderBuilder::new(schema)
+ .with_projection(vec![0, 1])
+ .build(file)
+ .unwrap();
+
let projected_schema = Arc::new(Schema::new(vec![
Field::new("city", DataType::Utf8, false),
Field::new("lat", DataType::Float64, false),
@@ -1567,24 +1364,19 @@ mod tests {
#[test]
fn test_csv_with_dictionary() {
- let schema = Schema::new(vec![
+ let schema = Arc::new(Schema::new(vec![
Field::new_dictionary("city", DataType::Int32, DataType::Utf8,
false),
Field::new("lat", DataType::Float64, false),
Field::new("lng", DataType::Float64, false),
- ]);
+ ]));
let file = File::open("test/data/uk_cities.csv").unwrap();
- let mut csv = Reader::new(
- file,
- Arc::new(schema),
- false,
- None,
- 1024,
- None,
- Some(vec![0, 1]),
- None,
- );
+ let mut csv = ReaderBuilder::new(schema)
+ .with_projection(vec![0, 1])
+ .build(file)
+ .unwrap();
+
let projected_schema = Arc::new(Schema::new(vec![
Field::new_dictionary("city", DataType::Int32, DataType::Utf8,
false),
Field::new("lat", DataType::Float64, false),
@@ -1596,7 +1388,7 @@ mod tests {
assert_eq!(2, batch.num_columns());
let strings = arrow_cast::cast(batch.column(0),
&DataType::Utf8).unwrap();
- let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+ let strings = strings.as_string::<i32>();
assert_eq!(strings.value(0), "Elgin, Scotland, the UK");
assert_eq!(strings.value(4), "Eastbourne, East Sussex, UK");
@@ -1605,17 +1397,20 @@ mod tests {
#[test]
fn test_nulls() {
- let schema = Schema::new(vec![
+ let schema = Arc::new(Schema::new(vec![
Field::new("c_int", DataType::UInt64, false),
Field::new("c_float", DataType::Float32, true),
Field::new("c_string", DataType::Utf8, false),
Field::new("c_bool", DataType::Boolean, false),
- ]);
+ ]));
let file = File::open("test/data/null_test.csv").unwrap();
- let mut csv =
- Reader::new(file, Arc::new(schema), true, None, 1024, None, None,
None);
+ let mut csv = ReaderBuilder::new(schema)
+ .has_header(true)
+ .build(file)
+ .unwrap();
+
let batch = csv.next().unwrap().unwrap();
assert!(!batch.column(1).is_null(0));
@@ -1627,12 +1422,14 @@ mod tests {
#[test]
fn test_nulls_with_inference() {
- let file = File::open("test/data/various_types.csv").unwrap();
+ let mut file = File::open("test/data/various_types.csv").unwrap();
+ let format = Format::default().with_header(true).with_delimiter(b'|');
- let builder = ReaderBuilder::new()
- .infer_schema(None)
- .has_header(true)
- .with_delimiter(b'|')
+ let (schema, _) = format.infer_schema(&mut file, None).unwrap();
+ file.rewind().unwrap();
+
+ let builder = ReaderBuilder::new(Arc::new(schema))
+ .with_format(format)
.with_batch_size(512)
.with_projection(vec![0, 1, 2, 3, 4, 5]);
@@ -1693,8 +1490,7 @@ mod tests {
Field::new("c_bool", DataType::Boolean, false),
]);
- let builder = ReaderBuilder::new()
- .with_schema(Arc::new(schema))
+ let builder = ReaderBuilder::new(Arc::new(schema))
.has_header(true)
.with_delimiter(b'|')
.with_batch_size(512)
@@ -1714,95 +1510,94 @@ mod tests {
}
/// Infer the data type of a record
- fn infer_field_schema(string: &str, datetime_re: Option<Regex>) ->
DataType {
+ fn infer_field_schema(string: &str) -> DataType {
let mut v = InferredDataType::default();
- v.update(string, datetime_re.as_ref());
+ v.update(string);
v.get()
}
#[test]
fn test_infer_field_schema() {
- assert_eq!(infer_field_schema("A", None), DataType::Utf8);
- assert_eq!(infer_field_schema("\"123\"", None), DataType::Utf8);
- assert_eq!(infer_field_schema("10", None), DataType::Int64);
- assert_eq!(infer_field_schema("10.2", None), DataType::Float64);
- assert_eq!(infer_field_schema(".2", None), DataType::Float64);
- assert_eq!(infer_field_schema("2.", None), DataType::Float64);
- assert_eq!(infer_field_schema("true", None), DataType::Boolean);
- assert_eq!(infer_field_schema("trUe", None), DataType::Boolean);
- assert_eq!(infer_field_schema("false", None), DataType::Boolean);
- assert_eq!(infer_field_schema("2020-11-08", None), DataType::Date32);
+ assert_eq!(infer_field_schema("A"), DataType::Utf8);
+ assert_eq!(infer_field_schema("\"123\""), DataType::Utf8);
+ assert_eq!(infer_field_schema("10"), DataType::Int64);
+ assert_eq!(infer_field_schema("10.2"), DataType::Float64);
+ assert_eq!(infer_field_schema(".2"), DataType::Float64);
+ assert_eq!(infer_field_schema("2."), DataType::Float64);
+ assert_eq!(infer_field_schema("true"), DataType::Boolean);
+ assert_eq!(infer_field_schema("trUe"), DataType::Boolean);
+ assert_eq!(infer_field_schema("false"), DataType::Boolean);
+ assert_eq!(infer_field_schema("2020-11-08"), DataType::Date32);
assert_eq!(
- infer_field_schema("2020-11-08T14:20:01", None),
+ infer_field_schema("2020-11-08T14:20:01"),
DataType::Timestamp(TimeUnit::Second, None)
);
assert_eq!(
- infer_field_schema("2020-11-08 14:20:01", None),
+ infer_field_schema("2020-11-08 14:20:01"),
DataType::Timestamp(TimeUnit::Second, None)
);
- let reg = Regex::new(r"^\d{4}-\d\d-\d\d \d\d:\d\d:\d\d$").ok();
assert_eq!(
- infer_field_schema("2020-11-08 14:20:01", reg),
+ infer_field_schema("2020-11-08 14:20:01"),
DataType::Timestamp(TimeUnit::Second, None)
);
- assert_eq!(infer_field_schema("-5.13", None), DataType::Float64);
- assert_eq!(infer_field_schema("0.1300", None), DataType::Float64);
+ assert_eq!(infer_field_schema("-5.13"), DataType::Float64);
+ assert_eq!(infer_field_schema("0.1300"), DataType::Float64);
assert_eq!(
- infer_field_schema("2021-12-19 13:12:30.921", None),
+ infer_field_schema("2021-12-19 13:12:30.921"),
DataType::Timestamp(TimeUnit::Millisecond, None)
);
assert_eq!(
- infer_field_schema("2021-12-19T13:12:30.123456789", None),
+ infer_field_schema("2021-12-19T13:12:30.123456789"),
DataType::Timestamp(TimeUnit::Nanosecond, None)
);
}
#[test]
fn parse_date32() {
- assert_eq!(parse_item::<Date32Type>("1970-01-01").unwrap(), 0);
- assert_eq!(parse_item::<Date32Type>("2020-03-15").unwrap(), 18336);
- assert_eq!(parse_item::<Date32Type>("1945-05-08").unwrap(), -9004);
+ assert_eq!(Date32Type::parse("1970-01-01").unwrap(), 0);
+ assert_eq!(Date32Type::parse("2020-03-15").unwrap(), 18336);
+ assert_eq!(Date32Type::parse("1945-05-08").unwrap(), -9004);
}
#[test]
fn parse_time() {
assert_eq!(
- parse_item::<Time64NanosecondType>("12:10:01.123456789 AM"),
+ Time64NanosecondType::parse("12:10:01.123456789 AM"),
Some(601_123_456_789)
);
assert_eq!(
- parse_item::<Time64MicrosecondType>("12:10:01.123456 am"),
+ Time64MicrosecondType::parse("12:10:01.123456 am"),
Some(601_123_456)
);
assert_eq!(
- parse_item::<Time32MillisecondType>("2:10:01.12 PM"),
+ Time32MillisecondType::parse("2:10:01.12 PM"),
Some(51_001_120)
);
- assert_eq!(parse_item::<Time32SecondType>("2:10:01 pm"), Some(51_001));
+ assert_eq!(Time32SecondType::parse("2:10:01 pm"), Some(51_001));
}
#[test]
fn parse_date64() {
- assert_eq!(parse_item::<Date64Type>("1970-01-01T00:00:00").unwrap(),
0);
+ assert_eq!(Date64Type::parse("1970-01-01T00:00:00").unwrap(), 0);
assert_eq!(
- parse_item::<Date64Type>("2018-11-13T17:11:10").unwrap(),
+ Date64Type::parse("2018-11-13T17:11:10").unwrap(),
1542129070000
);
assert_eq!(
- parse_item::<Date64Type>("2018-11-13T17:11:10.011").unwrap(),
+ Date64Type::parse("2018-11-13T17:11:10.011").unwrap(),
1542129070011
);
assert_eq!(
- parse_item::<Date64Type>("1900-02-28T12:34:56").unwrap(),
+ Date64Type::parse("1900-02-28T12:34:56").unwrap(),
-2203932304000
);
assert_eq!(
- parse_formatted::<Date64Type>("1900-02-28 12:34:56", "%Y-%m-%d
%H:%M:%S")
+ Date64Type::parse_formatted("1900-02-28 12:34:56", "%Y-%m-%d
%H:%M:%S")
.unwrap(),
-2203932304000
);
assert_eq!(
- parse_formatted::<Date64Type>(
+ Date64Type::parse_formatted(
"1900-02-28 12:34:56+0030",
"%Y-%m-%d %H:%M:%S%z"
)
@@ -1821,13 +1616,13 @@ mod tests {
"1970-01-01T00:00:00+02:00",
]
.join("\n");
- let mut decoder = ReaderBuilder::new()
- .with_schema(Arc::new(Schema::new(vec![Field::new(
- "field",
- DataType::Timestamp(T::UNIT, timezone.clone()),
- true,
- )])))
- .build_decoder();
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "field",
+ DataType::Timestamp(T::UNIT, timezone.clone()),
+ true,
+ )]));
+
+ let mut decoder = ReaderBuilder::new(schema).build_decoder();
let decoded = decoder.decode(csv.as_bytes()).unwrap();
assert_eq!(decoded, csv.len());
@@ -1933,17 +1728,12 @@ mod tests {
let reader = std::io::Cursor::new(data);
- let mut csv = Reader::new(
- reader,
- Arc::new(schema),
- false,
- None,
- 2,
- // starting at row 2 and up to row 6.
- Some((2, 6)),
- Some(vec![0]),
- None,
- );
+ let mut csv = ReaderBuilder::new(Arc::new(schema))
+ .with_batch_size(2)
+ .with_projection(vec![0])
+ .with_bounds(2, 6)
+ .build_buffered(reader)
+ .unwrap();
let batch = csv.next().unwrap().unwrap();
let a = batch.column(0);
@@ -1968,20 +1758,12 @@ mod tests {
.map(|x| x.join(","))
.collect::<Vec<_>>()
.join("\n");
- let data = data.as_bytes();
- let reader = std::io::Cursor::new(data);
-
- let mut csv = Reader::new(
- reader,
- Arc::new(schema),
- false,
- None,
- 2,
- None,
- Some(vec![]),
- None,
- );
+ let mut csv = ReaderBuilder::new(Arc::new(schema))
+ .with_batch_size(2)
+ .with_projection(vec![])
+ .build_buffered(Cursor::new(data.as_bytes()))
+ .unwrap();
let batch = csv.next().unwrap().unwrap();
assert_eq!(batch.columns().len(), 0);
@@ -2012,23 +1794,21 @@ mod tests {
#[test]
fn test_parsing_float() {
- assert_eq!(Some(12.34), parse_item::<Float64Type>("12.34"));
- assert_eq!(Some(-12.34), parse_item::<Float64Type>("-12.34"));
- assert_eq!(Some(12.0), parse_item::<Float64Type>("12"));
- assert_eq!(Some(0.0), parse_item::<Float64Type>("0"));
- assert_eq!(Some(2.0), parse_item::<Float64Type>("2."));
- assert_eq!(Some(0.2), parse_item::<Float64Type>(".2"));
- assert!(parse_item::<Float64Type>("nan").unwrap().is_nan());
- assert!(parse_item::<Float64Type>("NaN").unwrap().is_nan());
- assert!(parse_item::<Float64Type>("inf").unwrap().is_infinite());
- assert!(parse_item::<Float64Type>("inf").unwrap().is_sign_positive());
- assert!(parse_item::<Float64Type>("-inf").unwrap().is_infinite());
- assert!(parse_item::<Float64Type>("-inf")
- .unwrap()
- .is_sign_negative());
- assert_eq!(None, parse_item::<Float64Type>(""));
- assert_eq!(None, parse_item::<Float64Type>("dd"));
- assert_eq!(None, parse_item::<Float64Type>("12.34.56"));
+ assert_eq!(Some(12.34), Float64Type::parse("12.34"));
+ assert_eq!(Some(-12.34), Float64Type::parse("-12.34"));
+ assert_eq!(Some(12.0), Float64Type::parse("12"));
+ assert_eq!(Some(0.0), Float64Type::parse("0"));
+ assert_eq!(Some(2.0), Float64Type::parse("2."));
+ assert_eq!(Some(0.2), Float64Type::parse(".2"));
+ assert!(Float64Type::parse("nan").unwrap().is_nan());
+ assert!(Float64Type::parse("NaN").unwrap().is_nan());
+ assert!(Float64Type::parse("inf").unwrap().is_infinite());
+ assert!(Float64Type::parse("inf").unwrap().is_sign_positive());
+ assert!(Float64Type::parse("-inf").unwrap().is_infinite());
+ assert!(Float64Type::parse("-inf").unwrap().is_sign_negative());
+ assert_eq!(None, Float64Type::parse(""));
+ assert_eq!(None, Float64Type::parse("dd"));
+ assert_eq!(None, Float64Type::parse("12.34.56"));
}
#[test]
@@ -2037,8 +1817,7 @@ mod tests {
Field::new("text1", DataType::Utf8, false),
Field::new("text2", DataType::Utf8, false),
]);
- let builder = ReaderBuilder::new()
- .with_schema(Arc::new(schema))
+ let builder = ReaderBuilder::new(Arc::new(schema))
.has_header(false)
.with_quote(b'~'); // default is ", change to ~
@@ -2070,8 +1849,7 @@ mod tests {
Field::new("text1", DataType::Utf8, false),
Field::new("text2", DataType::Utf8, false),
]);
- let builder = ReaderBuilder::new()
- .with_schema(Arc::new(schema))
+ let builder = ReaderBuilder::new(Arc::new(schema))
.has_header(false)
.with_escape(b'\\'); // default is None, change to \
@@ -2103,8 +1881,7 @@ mod tests {
Field::new("text1", DataType::Utf8, false),
Field::new("text2", DataType::Utf8, false),
]);
- let builder = ReaderBuilder::new()
- .with_schema(Arc::new(schema))
+ let builder = ReaderBuilder::new(Arc::new(schema))
.has_header(false)
.with_terminator(b'\n'); // default is CRLF, change to LF
@@ -2141,14 +1918,18 @@ mod tests {
(Some((0, 4)), true, 4),
(Some((1, 4)), true, 3),
];
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("a", DataType::Utf8, false),
+ ]));
for (idx, (bounds, has_header, expected)) in
tests.into_iter().enumerate() {
- let mut reader = ReaderBuilder::new().has_header(has_header);
+ let mut reader =
ReaderBuilder::new(schema.clone()).has_header(has_header);
if let Some((start, end)) = bounds {
reader = reader.with_bounds(start, end);
}
let b = reader
- .build(Cursor::new(csv.as_bytes()))
+ .build_buffered(Cursor::new(csv.as_bytes()))
.unwrap()
.next()
.unwrap()
@@ -2160,7 +1941,12 @@ mod tests {
#[test]
fn test_null_boolean() {
let csv = "true,false\nFalse,True\n,True\nFalse,";
- let b = ReaderBuilder::new()
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Boolean, true),
+ Field::new("a", DataType::Boolean, true),
+ ]));
+
+ let b = ReaderBuilder::new(schema)
.build_buffered(Cursor::new(csv.as_bytes()))
.unwrap()
.next()
@@ -2194,9 +1980,14 @@ mod tests {
];
for (path, has_header, expected_rows) in tests {
+ let (schema, _) = Format::default()
+ .infer_schema(File::open(path).unwrap(), None)
+ .unwrap();
+ let schema = Arc::new(schema);
+
for batch_size in [1, 4] {
for capacity in [1, 3, 7, 100] {
- let reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new(schema.clone())
.with_batch_size(batch_size)
.has_header(has_header)
.build(File::open(path).unwrap())
@@ -2214,7 +2005,7 @@ mod tests {
File::open(path).unwrap(),
);
- let reader = ReaderBuilder::new()
+ let reader = ReaderBuilder::new(schema.clone())
.with_batch_size(batch_size)
.has_header(has_header)
.build_buffered(buffered)
@@ -2233,8 +2024,7 @@ mod tests {
Field::new("text2", DataType::Utf8, false),
]));
let buffer = std::io::BufReader::with_capacity(2, Cursor::new(csv));
- let b = ReaderBuilder::new()
- .with_schema(schema)
+ let b = ReaderBuilder::new(schema)
.with_batch_size(2)
.build_buffered(buffer)
.unwrap();
@@ -2314,8 +2104,7 @@ mod tests {
]));
let csv = "foo,bar\nbaz,foo\na,b\nc,d";
let mut read = InstrumentedRead::new(Cursor::new(csv.as_bytes()));
- let reader = ReaderBuilder::new()
- .with_schema(schema)
+ let reader = ReaderBuilder::new(schema)
.with_batch_size(3)
.build_buffered(&mut read)
.unwrap();
@@ -2383,7 +2172,7 @@ mod tests {
for (values, expected) in cases {
let mut t = InferredDataType::default();
for v in *values {
- t.update(v, None)
+ t.update(v)
}
assert_eq!(&t.get(), expected, "{values:?}")
}
diff --git a/arrow-csv/src/writer.rs b/arrow-csv/src/writer.rs
index 90c32832a..5f542be30 100644
--- a/arrow-csv/src/writer.rs
+++ b/arrow-csv/src/writer.rs
@@ -331,7 +331,7 @@ impl WriterBuilder {
mod tests {
use super::*;
- use crate::Reader;
+ use crate::ReaderBuilder;
use arrow_array::builder::{Decimal128Builder, Decimal256Builder};
use arrow_array::types::*;
use arrow_buffer::i256;
@@ -560,17 +560,11 @@ sed do eiusmod
tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
}
buf.set_position(0);
- let mut reader = Reader::new(
- buf,
- Arc::new(schema),
- false,
- None,
- 3,
- // starting at row 2 and up to row 6.
- None,
- None,
- None,
- );
+ let mut reader = ReaderBuilder::new(Arc::new(schema))
+ .with_batch_size(3)
+ .build_buffered(buf)
+ .unwrap();
+
let rb = reader.next().unwrap().unwrap();
let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
diff --git a/arrow/benches/csv_reader.rs b/arrow/benches/csv_reader.rs
index 66a956315..c2491a5a0 100644
--- a/arrow/benches/csv_reader.rs
+++ b/arrow/benches/csv_reader.rs
@@ -40,8 +40,7 @@ fn do_bench(c: &mut Criterion, name: &str, cols:
Vec<ArrayRef>) {
c.bench_function(&format!("{name} - {batch_size}"), |b| {
b.iter(|| {
let cursor = Cursor::new(buf.as_slice());
- let reader = csv::ReaderBuilder::new()
- .with_schema(batch.schema())
+ let reader = csv::ReaderBuilder::new(batch.schema())
.with_batch_size(batch_size)
.has_header(true)
.build_buffered(cursor)
diff --git a/arrow/examples/read_csv.rs b/arrow/examples/read_csv.rs
index efb55c6d2..60545a6e5 100644
--- a/arrow/examples/read_csv.rs
+++ b/arrow/examples/read_csv.rs
@@ -37,8 +37,10 @@ fn main() {
);
let file = File::open(path).unwrap();
- let mut csv =
- csv::Reader::new(file, Arc::new(schema), false, None, 1024, None,
None, None);
+ let mut csv = csv::ReaderBuilder::new(Arc::new(schema))
+ .build(file)
+ .unwrap();
+
let batch = csv.next().unwrap().unwrap();
print_batches(&[batch]).unwrap();
}
diff --git a/arrow/examples/read_csv_infer_schema.rs
b/arrow/examples/read_csv_infer_schema.rs
index 2a713ba61..bd3c1c6a4 100644
--- a/arrow/examples/read_csv_infer_schema.rs
+++ b/arrow/examples/read_csv_infer_schema.rs
@@ -19,17 +19,22 @@ extern crate arrow;
use arrow::csv;
use arrow::util::pretty::print_batches;
+use arrow_csv::reader::Format;
use std::fs::File;
+use std::io::Seek;
+use std::sync::Arc;
fn main() {
let path = format!(
"{}/../arrow-csv/test/data/uk_cities_with_headers.csv",
env!("CARGO_MANIFEST_DIR")
);
- let file = File::open(path).unwrap();
- let builder = csv::ReaderBuilder::new()
- .has_header(true)
- .infer_schema(Some(100));
+ let mut file = File::open(path).unwrap();
+ let format = Format::default().with_header(true);
+ let (schema, _) = format.infer_schema(&mut file, Some(100)).unwrap();
+ file.rewind().unwrap();
+
+ let builder =
csv::ReaderBuilder::new(Arc::new(schema)).with_format(format);
let mut csv = builder.build(file).unwrap();
let batch = csv.next().unwrap().unwrap();
print_batches(&[batch]).unwrap();
diff --git a/parquet/src/bin/parquet-fromcsv.rs
b/parquet/src/bin/parquet-fromcsv.rs
index 0a9950e9c..4e96fb878 100644
--- a/parquet/src/bin/parquet-fromcsv.rs
+++ b/parquet/src/bin/parquet-fromcsv.rs
@@ -314,8 +314,7 @@ fn configure_reader_builder(args: &Args, arrow_schema:
Arc<Schema>) -> ReaderBui
}
}
- let mut builder = ReaderBuilder::new()
- .with_schema(arrow_schema)
+ let mut builder = ReaderBuilder::new(arrow_schema)
.with_batch_size(args.batch_size)
.has_header(args.has_header)
.with_delimiter(args.get_delimiter());