This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fa8125fb Cleanup CSV schema inference (#4129) (#4130) (#4133)
9fa8125fb is described below

commit 9fa8125fbe14a3a85b4995617945bda51ee3b055
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Apr 27 08:33:12 2023 -0400

    Cleanup CSV schema inference (#4129) (#4130) (#4133)
    
    * Cleanup CSV schema inference (#4129) (#4130)
    
    * Update tests
    
    * Update parquet-fromcsv
---
 arrow-csv/src/reader/mod.rs             | 979 +++++++++++++-------------------
 arrow-csv/src/writer.rs                 |  18 +-
 arrow/benches/csv_reader.rs             |   3 +-
 arrow/examples/read_csv.rs              |   6 +-
 arrow/examples/read_csv_infer_schema.rs |  13 +-
 parquet/src/bin/parquet-fromcsv.rs      |   3 +-
 6 files changed, 405 insertions(+), 617 deletions(-)

diff --git a/arrow-csv/src/reader/mod.rs b/arrow-csv/src/reader/mod.rs
index 5bfcbc645..74294f42e 100644
--- a/arrow-csv/src/reader/mod.rs
+++ b/arrow-csv/src/reader/mod.rs
@@ -26,7 +26,7 @@
 //!
 //! ```
 //! # use arrow_schema::*;
-//! # use arrow_csv::Reader;
+//! # use arrow_csv::{Reader, ReaderBuilder};
 //! # use std::fs::File;
 //! # use std::sync::Arc;
 //!
@@ -38,7 +38,7 @@
 //!
 //! let file = File::open("test/data/uk_cities.csv").unwrap();
 //!
-//! let mut csv = Reader::new(file, Arc::new(schema), false, None, 1024, None, 
None, None);
+//! let mut csv = ReaderBuilder::new(Arc::new(schema)).build(file).unwrap();
 //! let batch = csv.next().unwrap().unwrap();
 //! ```
 //!
@@ -131,8 +131,9 @@ use arrow_array::*;
 use arrow_cast::parse::{parse_decimal, string_to_datetime, Parser};
 use arrow_schema::*;
 use chrono::{TimeZone, Utc};
+use csv::StringRecord;
 use lazy_static::lazy_static;
-use regex::{Regex, RegexSet};
+use regex::RegexSet;
 use std::fmt;
 use std::fs::File;
 use std::io::{BufRead, BufReader as StdBufReader, Read, Seek, SeekFrom};
@@ -141,7 +142,6 @@ use std::sync::Arc;
 use crate::map_csv_error;
 use crate::reader::records::{RecordDecoder, StringRecords};
 use arrow_array::timezone::Tz;
-use csv::StringRecord;
 
 lazy_static! {
     /// Order should match [`InferredDataType`]
@@ -194,32 +194,150 @@ impl InferredDataType {
     }
 
     /// Updates the [`InferredDataType`] with the given string
-    fn update(&mut self, string: &str, datetime_re: Option<&Regex>) {
+    fn update(&mut self, string: &str) {
         self.packed |= if string.starts_with('"') {
             1 << 8 // Utf8
         } else if let Some(m) = REGEX_SET.matches(string).into_iter().next() {
             1 << m
         } else {
-            match datetime_re {
-                // Timestamp(Nanosecond)
-                Some(d) if d.is_match(string) => 1 << 7,
-                _ => 1 << 8, // Utf8
-            }
+            1 << 8 // Utf8
         }
     }
 }
 
-/// This is a collection of options for csv reader when the builder pattern 
cannot be used
-/// and the parameters need to be passed around
-#[derive(Debug, Default, Clone)]
-struct ReaderOptions {
+/// The format specification for the CSV file
+#[derive(Debug, Clone, Default)]
+pub struct Format {
     has_header: bool,
     delimiter: Option<u8>,
     escape: Option<u8>,
     quote: Option<u8>,
     terminator: Option<u8>,
-    max_read_records: Option<usize>,
-    datetime_re: Option<Regex>,
+}
+
+impl Format {
+    pub fn with_header(mut self, has_header: bool) -> Self {
+        self.has_header = has_header;
+        self
+    }
+
+    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
+        self.delimiter = Some(delimiter);
+        self
+    }
+
+    pub fn with_escape(mut self, escape: u8) -> Self {
+        self.escape = Some(escape);
+        self
+    }
+
+    pub fn with_quote(mut self, quote: u8) -> Self {
+        self.quote = Some(quote);
+        self
+    }
+
+    pub fn with_terminator(mut self, terminator: u8) -> Self {
+        self.terminator = Some(terminator);
+        self
+    }
+
+    /// Infer schema of CSV records from the provided `reader`
+    ///
+    /// If `max_records` is `None`, all records will be read, otherwise up to 
`max_records`
+    /// records are read to infer the schema
+    ///
+    /// Returns inferred schema and number of records read
+    pub fn infer_schema<R: Read>(
+        &self,
+        reader: R,
+        max_records: Option<usize>,
+    ) -> Result<(Schema, usize), ArrowError> {
+        let mut csv_reader = self.build_reader(reader);
+
+        // get or create header names
+        // when has_header is false, creates default column names with column_ 
prefix
+        let headers: Vec<String> = if self.has_header {
+            let headers = 
&csv_reader.headers().map_err(map_csv_error)?.clone();
+            headers.iter().map(|s| s.to_string()).collect()
+        } else {
+            let first_record_count = 
&csv_reader.headers().map_err(map_csv_error)?.len();
+            (0..*first_record_count)
+                .map(|i| format!("column_{}", i + 1))
+                .collect()
+        };
+
+        let header_length = headers.len();
+        // keep track of inferred field types
+        let mut column_types: Vec<InferredDataType> =
+            vec![Default::default(); header_length];
+
+        let mut records_count = 0;
+
+        let mut record = StringRecord::new();
+        let max_records = max_records.unwrap_or(usize::MAX);
+        while records_count < max_records {
+            if !csv_reader.read_record(&mut record).map_err(map_csv_error)? {
+                break;
+            }
+            records_count += 1;
+
+            // Note since we may be looking at a sample of the data, we make 
the safe assumption that
+            // they could be nullable
+            for (i, column_type) in
+                column_types.iter_mut().enumerate().take(header_length)
+            {
+                if let Some(string) = record.get(i) {
+                    if !string.is_empty() {
+                        column_type.update(string)
+                    }
+                }
+            }
+        }
+
+        // build schema from inference results
+        let fields: Fields = column_types
+            .iter()
+            .zip(&headers)
+            .map(|(inferred, field_name)| Field::new(field_name, 
inferred.get(), true))
+            .collect();
+
+        Ok((Schema::new(fields), records_count))
+    }
+
+    /// Build a [`csv::Reader`] for this [`Format`]
+    fn build_reader<R: Read>(&self, reader: R) -> csv::Reader<R> {
+        let mut builder = csv::ReaderBuilder::new();
+        builder.has_headers(self.has_header);
+
+        if let Some(c) = self.delimiter {
+            builder.delimiter(c);
+        }
+        builder.escape(self.escape);
+        if let Some(c) = self.quote {
+            builder.quote(c);
+        }
+        if let Some(t) = self.terminator {
+            builder.terminator(csv::Terminator::Any(t));
+        }
+        builder.from_reader(reader)
+    }
+
+    /// Build a [`csv_core::Reader`] for this [`Format`]
+    fn build_parser(&self) -> csv_core::Reader {
+        let mut builder = csv_core::ReaderBuilder::new();
+        builder.escape(self.escape);
+
+        if let Some(c) = self.delimiter {
+            builder.delimiter(c);
+        }
+        if let Some(c) = self.quote {
+            builder.quote(c);
+        }
+        if let Some(t) = self.terminator {
+            builder.terminator(csv_core::Terminator::Any(t));
+        }
+        builder.build()
+    }
 }
 
 /// Infer the schema of a CSV file by reading through the first n records of 
the file,
@@ -231,34 +349,19 @@ struct ReaderOptions {
 /// reader cursor offset.
 ///
 /// The inferred schema will always have each field set as nullable.
+#[deprecated(note = "Use Format::infer_schema")]
+#[allow(deprecated)]
 pub fn infer_file_schema<R: Read + Seek>(
-    reader: R,
+    mut reader: R,
     delimiter: u8,
     max_read_records: Option<usize>,
     has_header: bool,
-) -> Result<(Schema, usize), ArrowError> {
-    let roptions = ReaderOptions {
-        delimiter: Some(delimiter),
-        max_read_records,
-        has_header,
-        ..Default::default()
-    };
-
-    infer_file_schema_with_csv_options(reader, roptions)
-}
-
-fn infer_file_schema_with_csv_options<R: Read + Seek>(
-    mut reader: R,
-    roptions: ReaderOptions,
 ) -> Result<(Schema, usize), ArrowError> {
     let saved_offset = reader.stream_position()?;
-
-    let (schema, records_count) =
-        infer_reader_schema_with_csv_options(&mut reader, roptions)?;
+    let r = infer_reader_schema(&mut reader, delimiter, max_read_records, 
has_header)?;
     // return the reader seek back to the start
     reader.seek(SeekFrom::Start(saved_offset))?;
-
-    Ok((schema, records_count))
+    Ok(r)
 }
 
 /// Infer schema of CSV records provided by struct that implements `Read` 
trait.
@@ -267,104 +370,19 @@ fn infer_file_schema_with_csv_options<R: Read + Seek>(
 /// not set, all records are read to infer the schema.
 ///
 /// Return inferred schema and number of records used for inference.
+#[deprecated(note = "Use Format::infer_schema")]
 pub fn infer_reader_schema<R: Read>(
     reader: R,
     delimiter: u8,
     max_read_records: Option<usize>,
     has_header: bool,
 ) -> Result<(Schema, usize), ArrowError> {
-    let roptions = ReaderOptions {
+    let format = Format {
         delimiter: Some(delimiter),
-        max_read_records,
         has_header,
         ..Default::default()
     };
-    infer_reader_schema_with_csv_options(reader, roptions)
-}
-
-/// Creates a `csv::Reader`
-fn build_csv_reader<R: Read>(
-    reader: R,
-    has_header: bool,
-    delimiter: Option<u8>,
-    escape: Option<u8>,
-    quote: Option<u8>,
-    terminator: Option<u8>,
-) -> csv::Reader<R> {
-    let mut reader_builder = csv::ReaderBuilder::new();
-    reader_builder.has_headers(has_header);
-
-    if let Some(c) = delimiter {
-        reader_builder.delimiter(c);
-    }
-    reader_builder.escape(escape);
-    if let Some(c) = quote {
-        reader_builder.quote(c);
-    }
-    if let Some(t) = terminator {
-        reader_builder.terminator(csv::Terminator::Any(t));
-    }
-    reader_builder.from_reader(reader)
-}
-
-fn infer_reader_schema_with_csv_options<R: Read>(
-    reader: R,
-    roptions: ReaderOptions,
-) -> Result<(Schema, usize), ArrowError> {
-    let mut csv_reader = build_csv_reader(
-        reader,
-        roptions.has_header,
-        roptions.delimiter,
-        roptions.escape,
-        roptions.quote,
-        roptions.terminator,
-    );
-
-    // get or create header names
-    // when has_header is false, creates default column names with column_ 
prefix
-    let headers: Vec<String> = if roptions.has_header {
-        let headers = &csv_reader.headers().map_err(map_csv_error)?.clone();
-        headers.iter().map(|s| s.to_string()).collect()
-    } else {
-        let first_record_count = 
&csv_reader.headers().map_err(map_csv_error)?.len();
-        (0..*first_record_count)
-            .map(|i| format!("column_{}", i + 1))
-            .collect()
-    };
-
-    let header_length = headers.len();
-    // keep track of inferred field types
-    let mut column_types: Vec<InferredDataType> = vec![Default::default(); 
header_length];
-
-    let mut records_count = 0;
-
-    let mut record = StringRecord::new();
-    let max_records = roptions.max_read_records.unwrap_or(usize::MAX);
-    while records_count < max_records {
-        if !csv_reader.read_record(&mut record).map_err(map_csv_error)? {
-            break;
-        }
-        records_count += 1;
-
-        // Note since we may be looking at a sample of the data, we make the 
safe assumption that
-        // they could be nullable
-        for (i, column_type) in 
column_types.iter_mut().enumerate().take(header_length) {
-            if let Some(string) = record.get(i) {
-                if !string.is_empty() {
-                    column_type.update(string, roptions.datetime_re.as_ref())
-                }
-            }
-        }
-    }
-
-    // build schema from inference results
-    let fields: Fields = column_types
-        .iter()
-        .zip(&headers)
-        .map(|(inferred, field_name)| Field::new(field_name, inferred.get(), 
true))
-        .collect();
-
-    Ok((Schema::new(fields), records_count))
+    format.infer_schema(reader, max_read_records)
 }
 
 /// Infer schema from a list of CSV files by reading through first n records
@@ -381,14 +399,15 @@ pub fn infer_schema_from_files(
 ) -> Result<Schema, ArrowError> {
     let mut schemas = vec![];
     let mut records_to_read = max_read_records.unwrap_or(usize::MAX);
+    let format = Format {
+        delimiter: Some(delimiter),
+        has_header,
+        ..Default::default()
+    };
 
     for fname in files.iter() {
-        let (schema, records_read) = infer_file_schema(
-            &mut File::open(fname)?,
-            delimiter,
-            Some(records_to_read),
-            has_header,
-        )?;
+        let f = File::open(fname)?;
+        let (schema, records_read) = format.infer_schema(f, 
Some(records_to_read))?;
         if records_read == 0 {
             continue;
         }
@@ -429,46 +448,6 @@ where
 }
 
 impl<R: Read> Reader<R> {
-    /// Create a new CsvReader from any value that implements the `Read` trait.
-    ///
-    /// If reading a `File` or an input that supports `std::io::Read` and 
`std::io::Seek`;
-    /// you can customise the Reader, such as to enable schema inference, use
-    /// `ReaderBuilder`.
-    #[allow(clippy::too_many_arguments)]
-    pub fn new(
-        reader: R,
-        schema: SchemaRef,
-        has_header: bool,
-        delimiter: Option<u8>,
-        batch_size: usize,
-        bounds: Bounds,
-        projection: Option<Vec<usize>>,
-        datetime_format: Option<String>,
-    ) -> Self {
-        let mut builder = ReaderBuilder::new()
-            .has_header(has_header)
-            .with_batch_size(batch_size)
-            .with_schema(schema);
-
-        if let Some(delimiter) = delimiter {
-            builder = builder.with_delimiter(delimiter);
-        }
-        if let Some((start, end)) = bounds {
-            builder = builder.with_bounds(start, end);
-        }
-        if let Some(projection) = projection {
-            builder = builder.with_projection(projection)
-        }
-        if let Some(format) = datetime_format {
-            builder = builder.with_datetime_format(format)
-        }
-
-        Self {
-            decoder: builder.build_decoder(),
-            reader: StdBufReader::new(reader),
-        }
-    }
-
     /// Returns the schema of the reader, useful for getting the schema 
without reading
     /// record batches
     pub fn schema(&self) -> SchemaRef {
@@ -481,34 +460,6 @@ impl<R: Read> Reader<R> {
             None => self.decoder.schema.clone(),
         }
     }
-
-    /// Create a new CsvReader from a Reader
-    ///
-    /// This constructor allows you more flexibility in what records are 
processed by the
-    /// csv reader.
-    #[allow(clippy::too_many_arguments)]
-    #[deprecated(note = "Use Reader::new or ReaderBuilder")]
-    pub fn from_reader(
-        reader: R,
-        schema: SchemaRef,
-        has_header: bool,
-        delimiter: Option<u8>,
-        batch_size: usize,
-        bounds: Bounds,
-        projection: Option<Vec<usize>>,
-        datetime_format: Option<String>,
-    ) -> Self {
-        Self::new(
-            reader,
-            schema,
-            has_header,
-            delimiter,
-            batch_size,
-            bounds,
-            projection,
-            datetime_format,
-        )
-    }
 }
 
 impl<R: BufRead> BufReader<R> {
@@ -558,8 +509,7 @@ impl<R: BufRead> Iterator for BufReader<R> {
 ///     schema: SchemaRef,
 ///     batch_size: usize,
 /// ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, 
ArrowError> {
-///     let mut decoder = ReaderBuilder::new()
-///         .with_schema(schema)
+///     let mut decoder = ReaderBuilder::new(schema)
 ///         .with_batch_size(batch_size)
 ///         .build_decoder();
 ///
@@ -601,11 +551,6 @@ pub struct Decoder {
 
     /// A decoder for [`StringRecords`]
     record_decoder: RecordDecoder,
-
-    /// datetime format used to parse datetime values, (format understood by 
chrono)
-    ///
-    /// For format refer to [chrono 
docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html)
-    datetime_format: Option<String>,
 }
 
 impl Decoder {
@@ -652,7 +597,6 @@ impl Decoder {
             Some(self.schema.metadata.clone()),
             self.projection.as_ref(),
             self.line_number,
-            self.datetime_format.as_deref(),
         )?;
         self.line_number += rows.len();
         Ok(Some(batch))
@@ -671,7 +615,6 @@ fn parse(
     metadata: Option<std::collections::HashMap<String, String>>,
     projection: Option<&Vec<usize>>,
     line_number: usize,
-    datetime_format: Option<&str>,
 ) -> Result<RecordBatch, ArrowError> {
     let projection: Vec<usize> = match projection {
         Some(v) => v.clone(),
@@ -703,63 +646,52 @@ fn parse(
                         *scale,
                     )
                 }
-                DataType::Int8 => {
-                    build_primitive_array::<Int8Type>(line_number, rows, i, 
None)
-                }
+                DataType::Int8 => 
build_primitive_array::<Int8Type>(line_number, rows, i),
                 DataType::Int16 => {
-                    build_primitive_array::<Int16Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<Int16Type>(line_number, rows, i)
                 }
                 DataType::Int32 => {
-                    build_primitive_array::<Int32Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<Int32Type>(line_number, rows, i)
                 }
                 DataType::Int64 => {
-                    build_primitive_array::<Int64Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<Int64Type>(line_number, rows, i)
                 }
                 DataType::UInt8 => {
-                    build_primitive_array::<UInt8Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<UInt8Type>(line_number, rows, i)
                 }
                 DataType::UInt16 => {
-                    build_primitive_array::<UInt16Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<UInt16Type>(line_number, rows, i)
                 }
                 DataType::UInt32 => {
-                    build_primitive_array::<UInt32Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<UInt32Type>(line_number, rows, i)
                 }
                 DataType::UInt64 => {
-                    build_primitive_array::<UInt64Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<UInt64Type>(line_number, rows, i)
                 }
                 DataType::Float32 => {
-                    build_primitive_array::<Float32Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<Float32Type>(line_number, rows, i)
                 }
                 DataType::Float64 => {
-                    build_primitive_array::<Float64Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<Float64Type>(line_number, rows, i)
                 }
                 DataType::Date32 => {
-                    build_primitive_array::<Date32Type>(line_number, rows, i, 
None)
+                    build_primitive_array::<Date32Type>(line_number, rows, i)
+                }
+                DataType::Date64 => {
+                    build_primitive_array::<Date64Type>(line_number, rows, i)
                 }
-                DataType::Date64 => build_primitive_array::<Date64Type>(
-                    line_number,
-                    rows,
-                    i,
-                    datetime_format,
-                ),
                 DataType::Time32(TimeUnit::Second) => {
-                    build_primitive_array::<Time32SecondType>(line_number, 
rows, i, None)
+                    build_primitive_array::<Time32SecondType>(line_number, 
rows, i)
+                }
+                DataType::Time32(TimeUnit::Millisecond) => {
+                    
build_primitive_array::<Time32MillisecondType>(line_number, rows, i)
+                }
+                DataType::Time64(TimeUnit::Microsecond) => {
+                    
build_primitive_array::<Time64MicrosecondType>(line_number, rows, i)
+                }
+                DataType::Time64(TimeUnit::Nanosecond) => {
+                    build_primitive_array::<Time64NanosecondType>(line_number, 
rows, i)
                 }
-                DataType::Time32(TimeUnit::Millisecond) => 
build_primitive_array::<
-                    Time32MillisecondType,
-                >(
-                    line_number, rows, i, None
-                ),
-                DataType::Time64(TimeUnit::Microsecond) => 
build_primitive_array::<
-                    Time64MicrosecondType,
-                >(
-                    line_number, rows, i, None
-                ),
-                DataType::Time64(TimeUnit::Nanosecond) => 
build_primitive_array::<
-                    Time64NanosecondType,
-                >(
-                    line_number, rows, i, None
-                ),
                 DataType::Timestamp(TimeUnit::Second, tz) => {
                     build_timestamp_array::<TimestampSecondType>(
                         line_number,
@@ -871,13 +803,6 @@ fn parse(
         )
     })
 }
-fn parse_item<T: Parser>(string: &str) -> Option<T::Native> {
-    T::parse(string)
-}
-
-fn parse_formatted<T: Parser>(string: &str, format: &str) -> Option<T::Native> 
{
-    T::parse_formatted(string, format)
-}
 
 fn parse_bool(string: &str) -> Option<bool> {
     if string.eq_ignore_ascii_case("false") {
@@ -928,7 +853,6 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
     line_number: usize,
     rows: &StringRecords<'_>,
     col_idx: usize,
-    format: Option<&str>,
 ) -> Result<ArrayRef, ArrowError> {
     rows.iter()
         .enumerate()
@@ -938,11 +862,7 @@ fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
                 return Ok(None);
             }
 
-            let parsed = match format {
-                Some(format) => parse_formatted::<T>(s, format),
-                _ => parse_item::<T>(s),
-            };
-            match parsed {
+            match T::parse(s) {
                 Some(e) => Ok(Some(e)),
                 None => Err(ArrowError::ParseError(format!(
                     // TODO: we should surface the underlying error here.
@@ -1037,28 +957,10 @@ fn build_boolean_array(
 /// CSV file reader builder
 #[derive(Debug)]
 pub struct ReaderBuilder {
-    /// Optional schema for the CSV file
-    ///
-    /// If the schema is not supplied, the reader will try to infer the schema
-    /// based on the CSV structure.
-    schema: Option<SchemaRef>,
-    /// Whether the file has headers or not
-    ///
-    /// If schema inference is run on a file with no headers, default column 
names
-    /// are created.
-    has_header: bool,
-    /// An optional column delimiter. Defaults to `b','`
-    delimiter: Option<u8>,
-    /// An optional escape character. Defaults None
-    escape: Option<u8>,
-    /// An optional quote character. Defaults b'\"'
-    quote: Option<u8>,
-    /// An optional record terminator. Defaults CRLF
-    terminator: Option<u8>,
-    /// Optional maximum number of records to read during schema inference
-    ///
-    /// If a number is not provided, all the records are read.
-    max_records: Option<usize>,
+    /// Schema of the CSV file
+    schema: SchemaRef,
+    /// Format of the CSV file
+    format: Format,
     /// Batch size (number of records to load each time)
     ///
     /// The default batch size when using the `ReaderBuilder` is 1024 records
@@ -1067,29 +969,6 @@ pub struct ReaderBuilder {
     bounds: Bounds,
     /// Optional projection for which columns to load (zero-based column 
indices)
     projection: Option<Vec<usize>>,
-    /// DateTime format to be used while trying to infer datetime format
-    datetime_re: Option<Regex>,
-    /// DateTime format to be used while parsing datetime format
-    datetime_format: Option<String>,
-}
-
-impl Default for ReaderBuilder {
-    fn default() -> Self {
-        Self {
-            schema: None,
-            has_header: false,
-            delimiter: None,
-            escape: None,
-            quote: None,
-            terminator: None,
-            max_records: None,
-            batch_size: 1024,
-            bounds: None,
-            projection: None,
-            datetime_re: None,
-            datetime_format: None,
-        }
-    }
 }
 
 impl ReaderBuilder {
@@ -1100,79 +979,60 @@ impl ReaderBuilder {
     /// # Example
     ///
     /// ```
-    /// use arrow_csv::{Reader, ReaderBuilder};
-    /// use std::fs::File;
-    ///
-    /// fn example() -> Reader<File> {
-    ///     let file = 
File::open("test/data/uk_cities_with_headers.csv").unwrap();
-    ///
-    ///     // create a builder, inferring the schema with the first 100 
records
-    ///     let builder = ReaderBuilder::new().infer_schema(Some(100));
-    ///
-    ///     let reader = builder.build(file).unwrap();
+    /// # use arrow_csv::{Reader, ReaderBuilder};
+    /// # use std::fs::File;
+    /// # use std::io::Seek;
+    /// # use std::sync::Arc;
+    /// # use arrow_csv::reader::Format;
+    /// #
+    /// let mut file = 
File::open("test/data/uk_cities_with_headers.csv").unwrap();
+    /// // Infer the schema with the first 100 records
+    /// let (schema, _) = Format::default().infer_schema(&mut file, 
Some(100)).unwrap();
+    /// file.rewind().unwrap();
     ///
-    ///     reader
-    /// }
+    /// // create a builder
+    /// ReaderBuilder::new(Arc::new(schema)).build(file).unwrap();
     /// ```
-    pub fn new() -> ReaderBuilder {
-        ReaderBuilder::default()
-    }
-
-    /// Set the CSV file's schema
-    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
-        self.schema = Some(schema);
-        self
+    pub fn new(schema: SchemaRef) -> ReaderBuilder {
+        Self {
+            schema,
+            format: Format::default(),
+            batch_size: 1024,
+            bounds: None,
+            projection: None,
+        }
     }
 
     /// Set whether the CSV file has headers
     pub fn has_header(mut self, has_header: bool) -> Self {
-        self.has_header = has_header;
+        self.format.has_header = has_header;
         self
     }
 
-    /// Set the datetime regex used to parse the string to Date64Type
-    /// this regex is used while inferring schema
-    pub fn with_datetime_re(mut self, datetime_re: Regex) -> Self {
-        self.datetime_re = Some(datetime_re);
-        self
-    }
-
-    /// Set the datetime format used to parse the string to Date64Type
-    /// this format is used while when the schema wants to parse Date64Type.
-    ///
-    /// For format refer to [chrono 
docs](https://docs.rs/chrono/0.4.19/chrono/format/strftime/index.html)
-    ///
-    pub fn with_datetime_format(mut self, datetime_format: String) -> Self {
-        self.datetime_format = Some(datetime_format);
+    /// Overrides the [`Format`] of this [`ReaderBuilder]
+    pub fn with_format(mut self, format: Format) -> Self {
+        self.format = format;
         self
     }
 
     /// Set the CSV file's column delimiter as a byte character
     pub fn with_delimiter(mut self, delimiter: u8) -> Self {
-        self.delimiter = Some(delimiter);
+        self.format.delimiter = Some(delimiter);
         self
     }
 
     pub fn with_escape(mut self, escape: u8) -> Self {
-        self.escape = Some(escape);
+        self.format.escape = Some(escape);
         self
     }
 
     pub fn with_quote(mut self, quote: u8) -> Self {
-        self.quote = Some(quote);
+        self.format.quote = Some(quote);
         self
     }
 
     pub fn with_terminator(mut self, terminator: u8) -> Self {
-        self.terminator = Some(terminator);
-        self
-    }
-
-    /// Set the CSV reader to infer the schema of the file
-    pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
-        // remove any schema that is set
-        self.schema = None;
-        self.max_records = max_records;
+        self.format.terminator = Some(terminator);
         self
     }
 
@@ -1199,32 +1059,15 @@ impl ReaderBuilder {
     ///
     /// If `R: BufRead` consider using [`Self::build_buffered`] to avoid 
unnecessary additional
     /// buffering, as internally this method wraps `reader` in 
[`std::io::BufReader`]
-    pub fn build<R: Read + Seek>(self, reader: R) -> Result<Reader<R>, 
ArrowError> {
+    pub fn build<R: Read>(self, reader: R) -> Result<Reader<R>, ArrowError> {
         self.build_buffered(StdBufReader::new(reader))
     }
 
     /// Create a new `BufReader` from a buffered reader
-    pub fn build_buffered<R: BufRead + Seek>(
-        mut self,
-        mut reader: R,
+    pub fn build_buffered<R: BufRead>(
+        self,
+        reader: R,
     ) -> Result<BufReader<R>, ArrowError> {
-        // check if schema should be inferred
-        if self.schema.is_none() {
-            let delimiter = self.delimiter.unwrap_or(b',');
-            let roptions = ReaderOptions {
-                delimiter: Some(delimiter),
-                max_read_records: self.max_records,
-                has_header: self.has_header,
-                escape: self.escape,
-                quote: self.quote,
-                terminator: self.terminator,
-                datetime_re: self.datetime_re.take(),
-            };
-            let (inferred_schema, _) =
-                infer_file_schema_with_csv_options(&mut reader, roptions)?;
-            self.schema = Some(Arc::new(inferred_schema))
-        }
-
         Ok(BufReader {
             reader,
             decoder: self.build_decoder(),
@@ -1232,28 +1075,11 @@ impl ReaderBuilder {
     }
 
     /// Builds a decoder that can be used to decode CSV from an arbitrary byte 
stream
-    ///
-    /// # Panics
-    ///
-    /// This method panics if no schema provided
     pub fn build_decoder(self) -> Decoder {
-        let schema = self.schema.expect("schema should be provided");
-        let mut reader_builder = csv_core::ReaderBuilder::new();
-        reader_builder.escape(self.escape);
-
-        if let Some(c) = self.delimiter {
-            reader_builder.delimiter(c);
-        }
-        if let Some(c) = self.quote {
-            reader_builder.quote(c);
-        }
-        if let Some(t) = self.terminator {
-            reader_builder.terminator(csv_core::Terminator::Any(t));
-        }
-        let delimiter = reader_builder.build();
-        let record_decoder = RecordDecoder::new(delimiter, 
schema.fields().len());
+        let delimiter = self.format.build_parser();
+        let record_decoder = RecordDecoder::new(delimiter, 
self.schema.fields().len());
 
-        let header = self.has_header as usize;
+        let header = self.format.has_header as usize;
 
         let (start, end) = match self.bounds {
             Some((start, end)) => (start + header, end + header),
@@ -1261,13 +1087,12 @@ impl ReaderBuilder {
         };
 
         Decoder {
-            schema,
+            schema: self.schema,
             to_skip: start,
             record_decoder,
             line_number: start,
             end,
             projection: self.projection,
-            datetime_format: self.datetime_format,
             batch_size: self.batch_size,
         }
     }
@@ -1284,74 +1109,46 @@ mod tests {
 
     #[test]
     fn test_csv() {
-        for format in [None, Some("%Y-%m-%dT%H:%M:%S%.f%:z".to_string())] {
-            let schema = Schema::new(vec![
-                Field::new("city", DataType::Utf8, false),
-                Field::new("lat", DataType::Float64, false),
-                Field::new("lng", DataType::Float64, false),
-            ]);
-
-            let file = File::open("test/data/uk_cities.csv").unwrap();
-            let mut csv = Reader::new(
-                file,
-                Arc::new(schema.clone()),
-                false,
-                None,
-                1024,
-                None,
-                None,
-                format,
-            );
-            assert_eq!(Arc::new(schema), csv.schema());
-            let batch = csv.next().unwrap().unwrap();
-            assert_eq!(37, batch.num_rows());
-            assert_eq!(3, batch.num_columns());
-
-            // access data from a primitive array
-            let lat = batch
-                .column(1)
-                .as_any()
-                .downcast_ref::<Float64Array>()
-                .unwrap();
-            assert_eq!(57.653484, lat.value(0));
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("city", DataType::Utf8, false),
+            Field::new("lat", DataType::Float64, false),
+            Field::new("lng", DataType::Float64, false),
+        ]));
 
-            // access data from a string array (ListArray<u8>)
-            let city = batch
-                .column(0)
-                .as_any()
-                .downcast_ref::<StringArray>()
-                .unwrap();
+        let file = File::open("test/data/uk_cities.csv").unwrap();
+        let mut csv = ReaderBuilder::new(schema.clone()).build(file).unwrap();
+        assert_eq!(schema, csv.schema());
+        let batch = csv.next().unwrap().unwrap();
+        assert_eq!(37, batch.num_rows());
+        assert_eq!(3, batch.num_columns());
 
-            assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
-        }
+        // access data from a primitive array
+        let lat = batch.column(1).as_primitive::<Float64Type>();
+        assert_eq!(57.653484, lat.value(0));
+
+        // access data from a string array (ListArray<u8>)
+        let city = batch.column(0).as_string::<i32>();
+
+        assert_eq!("Aberdeen, Aberdeen City, UK", city.value(13));
     }
 
     #[test]
     fn test_csv_schema_metadata() {
         let mut metadata = std::collections::HashMap::new();
         metadata.insert("foo".to_owned(), "bar".to_owned());
-        let schema = Schema::new_with_metadata(
+        let schema = Arc::new(Schema::new_with_metadata(
             vec![
                 Field::new("city", DataType::Utf8, false),
                 Field::new("lat", DataType::Float64, false),
                 Field::new("lng", DataType::Float64, false),
             ],
             metadata.clone(),
-        );
+        ));
 
         let file = File::open("test/data/uk_cities.csv").unwrap();
 
-        let mut csv = Reader::new(
-            file,
-            Arc::new(schema.clone()),
-            false,
-            None,
-            1024,
-            None,
-            None,
-            None,
-        );
-        assert_eq!(Arc::new(schema), csv.schema());
+        let mut csv = ReaderBuilder::new(schema.clone()).build(file).unwrap();
+        assert_eq!(schema, csv.schema());
         let batch = csv.next().unwrap().unwrap();
         assert_eq!(37, batch.num_rows());
         assert_eq!(3, batch.num_columns());
@@ -1361,16 +1158,15 @@ mod tests {
 
     #[test]
     fn test_csv_reader_with_decimal() {
-        let schema = Schema::new(vec![
+        let schema = Arc::new(Schema::new(vec![
             Field::new("city", DataType::Utf8, false),
             Field::new("lat", DataType::Decimal128(38, 6), false),
             Field::new("lng", DataType::Decimal256(76, 6), false),
-        ]);
+        ]));
 
         let file = File::open("test/data/decimal_test.csv").unwrap();
 
-        let mut csv =
-            Reader::new(file, Arc::new(schema), false, None, 1024, None, None, 
None);
+        let mut csv = ReaderBuilder::new(schema).build(file).unwrap();
         let batch = csv.next().unwrap().unwrap();
         // access data from a primitive array
         let lat = batch
@@ -1422,16 +1218,10 @@ mod tests {
         let both_files = file_with_headers
             .chain(Cursor::new("\n".to_string()))
             .chain(file_without_headers);
-        let mut csv = Reader::new(
-            both_files,
-            Arc::new(schema),
-            true,
-            None,
-            1024,
-            None,
-            None,
-            None,
-        );
+        let mut csv = ReaderBuilder::new(Arc::new(schema))
+            .has_header(true)
+            .build(both_files)
+            .unwrap();
         let batch = csv.next().unwrap().unwrap();
         assert_eq!(74, batch.num_rows());
         assert_eq!(3, batch.num_columns());
@@ -1439,9 +1229,15 @@ mod tests {
 
     #[test]
     fn test_csv_with_schema_inference() {
-        let file = File::open("test/data/uk_cities_with_headers.csv").unwrap();
+        let mut file = 
File::open("test/data/uk_cities_with_headers.csv").unwrap();
 
-        let builder = ReaderBuilder::new().has_header(true).infer_schema(None);
+        let (schema, _) = Format::default()
+            .with_header(true)
+            .infer_schema(&mut file, None)
+            .unwrap();
+
+        file.rewind().unwrap();
+        let builder = ReaderBuilder::new(Arc::new(schema)).has_header(true);
 
         let mut csv = builder.build(file).unwrap();
         let expected_schema = Schema::new(vec![
@@ -1474,11 +1270,12 @@ mod tests {
 
     #[test]
     fn test_csv_with_schema_inference_no_headers() {
-        let file = File::open("test/data/uk_cities.csv").unwrap();
+        let mut file = File::open("test/data/uk_cities.csv").unwrap();
 
-        let builder = ReaderBuilder::new().infer_schema(None);
+        let (schema, _) = Format::default().infer_schema(&mut file, 
None).unwrap();
+        file.rewind().unwrap();
 
-        let mut csv = builder.build(file).unwrap();
+        let mut csv = 
ReaderBuilder::new(Arc::new(schema)).build(file).unwrap();
 
         // csv field names should be 'column_{number}'
         let schema = csv.schema();
@@ -1512,10 +1309,15 @@ mod tests {
 
     #[test]
     fn test_csv_builder_with_bounds() {
-        let file = File::open("test/data/uk_cities.csv").unwrap();
+        let mut file = File::open("test/data/uk_cities.csv").unwrap();
 
         // Set the bounds to the lines 0, 1 and 2.
-        let mut csv = ReaderBuilder::new().with_bounds(0, 
2).build(file).unwrap();
+        let (schema, _) = Format::default().infer_schema(&mut file, 
None).unwrap();
+        file.rewind().unwrap();
+        let mut csv = ReaderBuilder::new(Arc::new(schema))
+            .with_bounds(0, 2)
+            .build(file)
+            .unwrap();
         let batch = csv.next().unwrap().unwrap();
 
         // access data from a string array (ListArray<u8>)
@@ -1536,24 +1338,19 @@ mod tests {
 
     #[test]
     fn test_csv_with_projection() {
-        let schema = Schema::new(vec![
+        let schema = Arc::new(Schema::new(vec![
             Field::new("city", DataType::Utf8, false),
             Field::new("lat", DataType::Float64, false),
             Field::new("lng", DataType::Float64, false),
-        ]);
+        ]));
 
         let file = File::open("test/data/uk_cities.csv").unwrap();
 
-        let mut csv = Reader::new(
-            file,
-            Arc::new(schema),
-            false,
-            None,
-            1024,
-            None,
-            Some(vec![0, 1]),
-            None,
-        );
+        let mut csv = ReaderBuilder::new(schema)
+            .with_projection(vec![0, 1])
+            .build(file)
+            .unwrap();
+
         let projected_schema = Arc::new(Schema::new(vec![
             Field::new("city", DataType::Utf8, false),
             Field::new("lat", DataType::Float64, false),
@@ -1567,24 +1364,19 @@ mod tests {
 
     #[test]
     fn test_csv_with_dictionary() {
-        let schema = Schema::new(vec![
+        let schema = Arc::new(Schema::new(vec![
             Field::new_dictionary("city", DataType::Int32, DataType::Utf8, 
false),
             Field::new("lat", DataType::Float64, false),
             Field::new("lng", DataType::Float64, false),
-        ]);
+        ]));
 
         let file = File::open("test/data/uk_cities.csv").unwrap();
 
-        let mut csv = Reader::new(
-            file,
-            Arc::new(schema),
-            false,
-            None,
-            1024,
-            None,
-            Some(vec![0, 1]),
-            None,
-        );
+        let mut csv = ReaderBuilder::new(schema)
+            .with_projection(vec![0, 1])
+            .build(file)
+            .unwrap();
+
         let projected_schema = Arc::new(Schema::new(vec![
             Field::new_dictionary("city", DataType::Int32, DataType::Utf8, 
false),
             Field::new("lat", DataType::Float64, false),
@@ -1596,7 +1388,7 @@ mod tests {
         assert_eq!(2, batch.num_columns());
 
         let strings = arrow_cast::cast(batch.column(0), 
&DataType::Utf8).unwrap();
-        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
+        let strings = strings.as_string::<i32>();
 
         assert_eq!(strings.value(0), "Elgin, Scotland, the UK");
         assert_eq!(strings.value(4), "Eastbourne, East Sussex, UK");
@@ -1605,17 +1397,20 @@ mod tests {
 
     #[test]
     fn test_nulls() {
-        let schema = Schema::new(vec![
+        let schema = Arc::new(Schema::new(vec![
             Field::new("c_int", DataType::UInt64, false),
             Field::new("c_float", DataType::Float32, true),
             Field::new("c_string", DataType::Utf8, false),
             Field::new("c_bool", DataType::Boolean, false),
-        ]);
+        ]));
 
         let file = File::open("test/data/null_test.csv").unwrap();
 
-        let mut csv =
-            Reader::new(file, Arc::new(schema), true, None, 1024, None, None, 
None);
+        let mut csv = ReaderBuilder::new(schema)
+            .has_header(true)
+            .build(file)
+            .unwrap();
+
         let batch = csv.next().unwrap().unwrap();
 
         assert!(!batch.column(1).is_null(0));
@@ -1627,12 +1422,14 @@ mod tests {
 
     #[test]
     fn test_nulls_with_inference() {
-        let file = File::open("test/data/various_types.csv").unwrap();
+        let mut file = File::open("test/data/various_types.csv").unwrap();
+        let format = Format::default().with_header(true).with_delimiter(b'|');
 
-        let builder = ReaderBuilder::new()
-            .infer_schema(None)
-            .has_header(true)
-            .with_delimiter(b'|')
+        let (schema, _) = format.infer_schema(&mut file, None).unwrap();
+        file.rewind().unwrap();
+
+        let builder = ReaderBuilder::new(Arc::new(schema))
+            .with_format(format)
             .with_batch_size(512)
             .with_projection(vec![0, 1, 2, 3, 4, 5]);
 
@@ -1693,8 +1490,7 @@ mod tests {
             Field::new("c_bool", DataType::Boolean, false),
         ]);
 
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
+        let builder = ReaderBuilder::new(Arc::new(schema))
             .has_header(true)
             .with_delimiter(b'|')
             .with_batch_size(512)
@@ -1714,95 +1510,94 @@ mod tests {
     }
 
     /// Infer the data type of a record
-    fn infer_field_schema(string: &str, datetime_re: Option<Regex>) -> 
DataType {
+    fn infer_field_schema(string: &str) -> DataType {
         let mut v = InferredDataType::default();
-        v.update(string, datetime_re.as_ref());
+        v.update(string);
         v.get()
     }
 
     #[test]
     fn test_infer_field_schema() {
-        assert_eq!(infer_field_schema("A", None), DataType::Utf8);
-        assert_eq!(infer_field_schema("\"123\"", None), DataType::Utf8);
-        assert_eq!(infer_field_schema("10", None), DataType::Int64);
-        assert_eq!(infer_field_schema("10.2", None), DataType::Float64);
-        assert_eq!(infer_field_schema(".2", None), DataType::Float64);
-        assert_eq!(infer_field_schema("2.", None), DataType::Float64);
-        assert_eq!(infer_field_schema("true", None), DataType::Boolean);
-        assert_eq!(infer_field_schema("trUe", None), DataType::Boolean);
-        assert_eq!(infer_field_schema("false", None), DataType::Boolean);
-        assert_eq!(infer_field_schema("2020-11-08", None), DataType::Date32);
+        assert_eq!(infer_field_schema("A"), DataType::Utf8);
+        assert_eq!(infer_field_schema("\"123\""), DataType::Utf8);
+        assert_eq!(infer_field_schema("10"), DataType::Int64);
+        assert_eq!(infer_field_schema("10.2"), DataType::Float64);
+        assert_eq!(infer_field_schema(".2"), DataType::Float64);
+        assert_eq!(infer_field_schema("2."), DataType::Float64);
+        assert_eq!(infer_field_schema("true"), DataType::Boolean);
+        assert_eq!(infer_field_schema("trUe"), DataType::Boolean);
+        assert_eq!(infer_field_schema("false"), DataType::Boolean);
+        assert_eq!(infer_field_schema("2020-11-08"), DataType::Date32);
         assert_eq!(
-            infer_field_schema("2020-11-08T14:20:01", None),
+            infer_field_schema("2020-11-08T14:20:01"),
             DataType::Timestamp(TimeUnit::Second, None)
         );
         assert_eq!(
-            infer_field_schema("2020-11-08 14:20:01", None),
+            infer_field_schema("2020-11-08 14:20:01"),
             DataType::Timestamp(TimeUnit::Second, None)
         );
-        let reg = Regex::new(r"^\d{4}-\d\d-\d\d \d\d:\d\d:\d\d$").ok();
         assert_eq!(
-            infer_field_schema("2020-11-08 14:20:01", reg),
+            infer_field_schema("2020-11-08 14:20:01"),
             DataType::Timestamp(TimeUnit::Second, None)
         );
-        assert_eq!(infer_field_schema("-5.13", None), DataType::Float64);
-        assert_eq!(infer_field_schema("0.1300", None), DataType::Float64);
+        assert_eq!(infer_field_schema("-5.13"), DataType::Float64);
+        assert_eq!(infer_field_schema("0.1300"), DataType::Float64);
         assert_eq!(
-            infer_field_schema("2021-12-19 13:12:30.921", None),
+            infer_field_schema("2021-12-19 13:12:30.921"),
             DataType::Timestamp(TimeUnit::Millisecond, None)
         );
         assert_eq!(
-            infer_field_schema("2021-12-19T13:12:30.123456789", None),
+            infer_field_schema("2021-12-19T13:12:30.123456789"),
             DataType::Timestamp(TimeUnit::Nanosecond, None)
         );
     }
 
     #[test]
     fn parse_date32() {
-        assert_eq!(parse_item::<Date32Type>("1970-01-01").unwrap(), 0);
-        assert_eq!(parse_item::<Date32Type>("2020-03-15").unwrap(), 18336);
-        assert_eq!(parse_item::<Date32Type>("1945-05-08").unwrap(), -9004);
+        assert_eq!(Date32Type::parse("1970-01-01").unwrap(), 0);
+        assert_eq!(Date32Type::parse("2020-03-15").unwrap(), 18336);
+        assert_eq!(Date32Type::parse("1945-05-08").unwrap(), -9004);
     }
 
     #[test]
     fn parse_time() {
         assert_eq!(
-            parse_item::<Time64NanosecondType>("12:10:01.123456789 AM"),
+            Time64NanosecondType::parse("12:10:01.123456789 AM"),
             Some(601_123_456_789)
         );
         assert_eq!(
-            parse_item::<Time64MicrosecondType>("12:10:01.123456 am"),
+            Time64MicrosecondType::parse("12:10:01.123456 am"),
             Some(601_123_456)
         );
         assert_eq!(
-            parse_item::<Time32MillisecondType>("2:10:01.12 PM"),
+            Time32MillisecondType::parse("2:10:01.12 PM"),
             Some(51_001_120)
         );
-        assert_eq!(parse_item::<Time32SecondType>("2:10:01 pm"), Some(51_001));
+        assert_eq!(Time32SecondType::parse("2:10:01 pm"), Some(51_001));
     }
 
     #[test]
     fn parse_date64() {
-        assert_eq!(parse_item::<Date64Type>("1970-01-01T00:00:00").unwrap(), 
0);
+        assert_eq!(Date64Type::parse("1970-01-01T00:00:00").unwrap(), 0);
         assert_eq!(
-            parse_item::<Date64Type>("2018-11-13T17:11:10").unwrap(),
+            Date64Type::parse("2018-11-13T17:11:10").unwrap(),
             1542129070000
         );
         assert_eq!(
-            parse_item::<Date64Type>("2018-11-13T17:11:10.011").unwrap(),
+            Date64Type::parse("2018-11-13T17:11:10.011").unwrap(),
             1542129070011
         );
         assert_eq!(
-            parse_item::<Date64Type>("1900-02-28T12:34:56").unwrap(),
+            Date64Type::parse("1900-02-28T12:34:56").unwrap(),
             -2203932304000
         );
         assert_eq!(
-            parse_formatted::<Date64Type>("1900-02-28 12:34:56", "%Y-%m-%d 
%H:%M:%S")
+            Date64Type::parse_formatted("1900-02-28 12:34:56", "%Y-%m-%d 
%H:%M:%S")
                 .unwrap(),
             -2203932304000
         );
         assert_eq!(
-            parse_formatted::<Date64Type>(
+            Date64Type::parse_formatted(
                 "1900-02-28 12:34:56+0030",
                 "%Y-%m-%d %H:%M:%S%z"
             )
@@ -1821,13 +1616,13 @@ mod tests {
             "1970-01-01T00:00:00+02:00",
         ]
         .join("\n");
-        let mut decoder = ReaderBuilder::new()
-            .with_schema(Arc::new(Schema::new(vec![Field::new(
-                "field",
-                DataType::Timestamp(T::UNIT, timezone.clone()),
-                true,
-            )])))
-            .build_decoder();
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "field",
+            DataType::Timestamp(T::UNIT, timezone.clone()),
+            true,
+        )]));
+
+        let mut decoder = ReaderBuilder::new(schema).build_decoder();
 
         let decoded = decoder.decode(csv.as_bytes()).unwrap();
         assert_eq!(decoded, csv.len());
@@ -1933,17 +1728,12 @@ mod tests {
 
         let reader = std::io::Cursor::new(data);
 
-        let mut csv = Reader::new(
-            reader,
-            Arc::new(schema),
-            false,
-            None,
-            2,
-            // starting at row 2 and up to row 6.
-            Some((2, 6)),
-            Some(vec![0]),
-            None,
-        );
+        let mut csv = ReaderBuilder::new(Arc::new(schema))
+            .with_batch_size(2)
+            .with_projection(vec![0])
+            .with_bounds(2, 6)
+            .build_buffered(reader)
+            .unwrap();
 
         let batch = csv.next().unwrap().unwrap();
         let a = batch.column(0);
@@ -1968,20 +1758,12 @@ mod tests {
             .map(|x| x.join(","))
             .collect::<Vec<_>>()
             .join("\n");
-        let data = data.as_bytes();
 
-        let reader = std::io::Cursor::new(data);
-
-        let mut csv = Reader::new(
-            reader,
-            Arc::new(schema),
-            false,
-            None,
-            2,
-            None,
-            Some(vec![]),
-            None,
-        );
+        let mut csv = ReaderBuilder::new(Arc::new(schema))
+            .with_batch_size(2)
+            .with_projection(vec![])
+            .build_buffered(Cursor::new(data.as_bytes()))
+            .unwrap();
 
         let batch = csv.next().unwrap().unwrap();
         assert_eq!(batch.columns().len(), 0);
@@ -2012,23 +1794,21 @@ mod tests {
 
     #[test]
     fn test_parsing_float() {
-        assert_eq!(Some(12.34), parse_item::<Float64Type>("12.34"));
-        assert_eq!(Some(-12.34), parse_item::<Float64Type>("-12.34"));
-        assert_eq!(Some(12.0), parse_item::<Float64Type>("12"));
-        assert_eq!(Some(0.0), parse_item::<Float64Type>("0"));
-        assert_eq!(Some(2.0), parse_item::<Float64Type>("2."));
-        assert_eq!(Some(0.2), parse_item::<Float64Type>(".2"));
-        assert!(parse_item::<Float64Type>("nan").unwrap().is_nan());
-        assert!(parse_item::<Float64Type>("NaN").unwrap().is_nan());
-        assert!(parse_item::<Float64Type>("inf").unwrap().is_infinite());
-        assert!(parse_item::<Float64Type>("inf").unwrap().is_sign_positive());
-        assert!(parse_item::<Float64Type>("-inf").unwrap().is_infinite());
-        assert!(parse_item::<Float64Type>("-inf")
-            .unwrap()
-            .is_sign_negative());
-        assert_eq!(None, parse_item::<Float64Type>(""));
-        assert_eq!(None, parse_item::<Float64Type>("dd"));
-        assert_eq!(None, parse_item::<Float64Type>("12.34.56"));
+        assert_eq!(Some(12.34), Float64Type::parse("12.34"));
+        assert_eq!(Some(-12.34), Float64Type::parse("-12.34"));
+        assert_eq!(Some(12.0), Float64Type::parse("12"));
+        assert_eq!(Some(0.0), Float64Type::parse("0"));
+        assert_eq!(Some(2.0), Float64Type::parse("2."));
+        assert_eq!(Some(0.2), Float64Type::parse(".2"));
+        assert!(Float64Type::parse("nan").unwrap().is_nan());
+        assert!(Float64Type::parse("NaN").unwrap().is_nan());
+        assert!(Float64Type::parse("inf").unwrap().is_infinite());
+        assert!(Float64Type::parse("inf").unwrap().is_sign_positive());
+        assert!(Float64Type::parse("-inf").unwrap().is_infinite());
+        assert!(Float64Type::parse("-inf").unwrap().is_sign_negative());
+        assert_eq!(None, Float64Type::parse(""));
+        assert_eq!(None, Float64Type::parse("dd"));
+        assert_eq!(None, Float64Type::parse("12.34.56"));
     }
 
     #[test]
@@ -2037,8 +1817,7 @@ mod tests {
             Field::new("text1", DataType::Utf8, false),
             Field::new("text2", DataType::Utf8, false),
         ]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
+        let builder = ReaderBuilder::new(Arc::new(schema))
             .has_header(false)
             .with_quote(b'~'); // default is ", change to ~
 
@@ -2070,8 +1849,7 @@ mod tests {
             Field::new("text1", DataType::Utf8, false),
             Field::new("text2", DataType::Utf8, false),
         ]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
+        let builder = ReaderBuilder::new(Arc::new(schema))
             .has_header(false)
             .with_escape(b'\\'); // default is None, change to \
 
@@ -2103,8 +1881,7 @@ mod tests {
             Field::new("text1", DataType::Utf8, false),
             Field::new("text2", DataType::Utf8, false),
         ]);
-        let builder = ReaderBuilder::new()
-            .with_schema(Arc::new(schema))
+        let builder = ReaderBuilder::new(Arc::new(schema))
             .has_header(false)
             .with_terminator(b'\n'); // default is CRLF, change to LF
 
@@ -2141,14 +1918,18 @@ mod tests {
             (Some((0, 4)), true, 4),
             (Some((1, 4)), true, 3),
         ];
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("a", DataType::Utf8, false),
+        ]));
 
         for (idx, (bounds, has_header, expected)) in 
tests.into_iter().enumerate() {
-            let mut reader = ReaderBuilder::new().has_header(has_header);
+            let mut reader = 
ReaderBuilder::new(schema.clone()).has_header(has_header);
             if let Some((start, end)) = bounds {
                 reader = reader.with_bounds(start, end);
             }
             let b = reader
-                .build(Cursor::new(csv.as_bytes()))
+                .build_buffered(Cursor::new(csv.as_bytes()))
                 .unwrap()
                 .next()
                 .unwrap()
@@ -2160,7 +1941,12 @@ mod tests {
     #[test]
     fn test_null_boolean() {
         let csv = "true,false\nFalse,True\n,True\nFalse,";
-        let b = ReaderBuilder::new()
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Boolean, true),
+            Field::new("a", DataType::Boolean, true),
+        ]));
+
+        let b = ReaderBuilder::new(schema)
             .build_buffered(Cursor::new(csv.as_bytes()))
             .unwrap()
             .next()
@@ -2194,9 +1980,14 @@ mod tests {
         ];
 
         for (path, has_header, expected_rows) in tests {
+            let (schema, _) = Format::default()
+                .infer_schema(File::open(path).unwrap(), None)
+                .unwrap();
+            let schema = Arc::new(schema);
+
             for batch_size in [1, 4] {
                 for capacity in [1, 3, 7, 100] {
-                    let reader = ReaderBuilder::new()
+                    let reader = ReaderBuilder::new(schema.clone())
                         .with_batch_size(batch_size)
                         .has_header(has_header)
                         .build(File::open(path).unwrap())
@@ -2214,7 +2005,7 @@ mod tests {
                         File::open(path).unwrap(),
                     );
 
-                    let reader = ReaderBuilder::new()
+                    let reader = ReaderBuilder::new(schema.clone())
                         .with_batch_size(batch_size)
                         .has_header(has_header)
                         .build_buffered(buffered)
@@ -2233,8 +2024,7 @@ mod tests {
             Field::new("text2", DataType::Utf8, false),
         ]));
         let buffer = std::io::BufReader::with_capacity(2, Cursor::new(csv));
-        let b = ReaderBuilder::new()
-            .with_schema(schema)
+        let b = ReaderBuilder::new(schema)
             .with_batch_size(2)
             .build_buffered(buffer)
             .unwrap();
@@ -2314,8 +2104,7 @@ mod tests {
         ]));
         let csv = "foo,bar\nbaz,foo\na,b\nc,d";
         let mut read = InstrumentedRead::new(Cursor::new(csv.as_bytes()));
-        let reader = ReaderBuilder::new()
-            .with_schema(schema)
+        let reader = ReaderBuilder::new(schema)
             .with_batch_size(3)
             .build_buffered(&mut read)
             .unwrap();
@@ -2383,7 +2172,7 @@ mod tests {
         for (values, expected) in cases {
             let mut t = InferredDataType::default();
             for v in *values {
-                t.update(v, None)
+                t.update(v)
             }
             assert_eq!(&t.get(), expected, "{values:?}")
         }
diff --git a/arrow-csv/src/writer.rs b/arrow-csv/src/writer.rs
index 90c32832a..5f542be30 100644
--- a/arrow-csv/src/writer.rs
+++ b/arrow-csv/src/writer.rs
@@ -331,7 +331,7 @@ impl WriterBuilder {
 mod tests {
     use super::*;
 
-    use crate::Reader;
+    use crate::ReaderBuilder;
     use arrow_array::builder::{Decimal128Builder, Decimal256Builder};
     use arrow_array::types::*;
     use arrow_buffer::i256;
@@ -560,17 +560,11 @@ sed do eiusmod 
tempor,-556132.25,1,,2019-04-18T02:45:55.555000000,23:46:03,foo
         }
         buf.set_position(0);
 
-        let mut reader = Reader::new(
-            buf,
-            Arc::new(schema),
-            false,
-            None,
-            3,
-            // starting at row 2 and up to row 6.
-            None,
-            None,
-            None,
-        );
+        let mut reader = ReaderBuilder::new(Arc::new(schema))
+            .with_batch_size(3)
+            .build_buffered(buf)
+            .unwrap();
+
         let rb = reader.next().unwrap().unwrap();
         let c1 = rb.column(0).as_any().downcast_ref::<Date32Array>().unwrap();
         let c2 = rb.column(1).as_any().downcast_ref::<Date64Array>().unwrap();
diff --git a/arrow/benches/csv_reader.rs b/arrow/benches/csv_reader.rs
index 66a956315..c2491a5a0 100644
--- a/arrow/benches/csv_reader.rs
+++ b/arrow/benches/csv_reader.rs
@@ -40,8 +40,7 @@ fn do_bench(c: &mut Criterion, name: &str, cols: 
Vec<ArrayRef>) {
         c.bench_function(&format!("{name} - {batch_size}"), |b| {
             b.iter(|| {
                 let cursor = Cursor::new(buf.as_slice());
-                let reader = csv::ReaderBuilder::new()
-                    .with_schema(batch.schema())
+                let reader = csv::ReaderBuilder::new(batch.schema())
                     .with_batch_size(batch_size)
                     .has_header(true)
                     .build_buffered(cursor)
diff --git a/arrow/examples/read_csv.rs b/arrow/examples/read_csv.rs
index efb55c6d2..60545a6e5 100644
--- a/arrow/examples/read_csv.rs
+++ b/arrow/examples/read_csv.rs
@@ -37,8 +37,10 @@ fn main() {
     );
     let file = File::open(path).unwrap();
 
-    let mut csv =
-        csv::Reader::new(file, Arc::new(schema), false, None, 1024, None, 
None, None);
+    let mut csv = csv::ReaderBuilder::new(Arc::new(schema))
+        .build(file)
+        .unwrap();
+
     let batch = csv.next().unwrap().unwrap();
     print_batches(&[batch]).unwrap();
 }
diff --git a/arrow/examples/read_csv_infer_schema.rs 
b/arrow/examples/read_csv_infer_schema.rs
index 2a713ba61..bd3c1c6a4 100644
--- a/arrow/examples/read_csv_infer_schema.rs
+++ b/arrow/examples/read_csv_infer_schema.rs
@@ -19,17 +19,22 @@ extern crate arrow;
 
 use arrow::csv;
 use arrow::util::pretty::print_batches;
+use arrow_csv::reader::Format;
 use std::fs::File;
+use std::io::Seek;
+use std::sync::Arc;
 
 fn main() {
     let path = format!(
         "{}/../arrow-csv/test/data/uk_cities_with_headers.csv",
         env!("CARGO_MANIFEST_DIR")
     );
-    let file = File::open(path).unwrap();
-    let builder = csv::ReaderBuilder::new()
-        .has_header(true)
-        .infer_schema(Some(100));
+    let mut file = File::open(path).unwrap();
+    let format = Format::default().with_header(true);
+    let (schema, _) = format.infer_schema(&mut file, Some(100)).unwrap();
+    file.rewind().unwrap();
+
+    let builder = 
csv::ReaderBuilder::new(Arc::new(schema)).with_format(format);
     let mut csv = builder.build(file).unwrap();
     let batch = csv.next().unwrap().unwrap();
     print_batches(&[batch]).unwrap();
diff --git a/parquet/src/bin/parquet-fromcsv.rs 
b/parquet/src/bin/parquet-fromcsv.rs
index 0a9950e9c..4e96fb878 100644
--- a/parquet/src/bin/parquet-fromcsv.rs
+++ b/parquet/src/bin/parquet-fromcsv.rs
@@ -314,8 +314,7 @@ fn configure_reader_builder(args: &Args, arrow_schema: 
Arc<Schema>) -> ReaderBui
         }
     }
 
-    let mut builder = ReaderBuilder::new()
-        .with_schema(arrow_schema)
+    let mut builder = ReaderBuilder::new(arrow_schema)
         .with_batch_size(args.batch_size)
         .has_header(args.has_header)
         .with_delimiter(args.get_delimiter());

Reply via email to