This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7d233cb ARROW-8631: [C++][Python][Dataset] Add ReadOptions to
CsvFileFormat, expose options to python
7d233cb is described below
commit 7d233cb395c84c9c207daad71f5298662d111901
Author: David Li <[email protected]>
AuthorDate: Tue Mar 23 11:58:13 2021 -0400
ARROW-8631: [C++][Python][Dataset] Add ReadOptions to CsvFileFormat, expose
options to python
This adds ReadOptions to CsvFileFormat and exposes ReadOptions,
ConvertOptions, and CsvFragmentScanOptions to Python.
ReadOptions was added to CsvFileFormat as its options can affect the
discovered schema. For the block size, which does not need to be global, a
field was added to CsvFragmentScanOptions.
Closes #9725 from lidavidm/arrow-8631
Authored-by: David Li <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/dataset/dataset_internal.h | 22 ++++
cpp/src/arrow/dataset/file_base.h | 5 +
cpp/src/arrow/dataset/file_csv.cc | 24 +++--
cpp/src/arrow/dataset/file_csv.h | 10 +-
cpp/src/arrow/dataset/file_csv_test.cc | 37 +++++++
cpp/src/arrow/dataset/scanner.cc | 6 ++
cpp/src/arrow/dataset/scanner.h | 3 +
cpp/src/arrow/dataset/type_fwd.h | 3 +
python/pyarrow/_csv.pxd | 17 +++
python/pyarrow/_csv.pyx | 88 ++++++++++++++--
python/pyarrow/_dataset.pyx | 149 +++++++++++++++++++++++++--
python/pyarrow/dataset.py | 1 +
python/pyarrow/includes/libarrow_dataset.pxd | 11 ++
python/pyarrow/tests/test_dataset.py | 61 +++++++++++
r/R/arrowExports.R | 16 ++-
r/R/dataset-format.R | 106 +++++++++++++++++--
r/R/dataset-scan.R | 10 ++
r/src/arrowExports.cpp | 63 ++++++++++-
r/src/dataset.cpp | 33 +++++-
r/tests/testthat/test-dataset.R | 66 ++++++++++++
20 files changed, 690 insertions(+), 41 deletions(-)
diff --git a/cpp/src/arrow/dataset/dataset_internal.h
b/cpp/src/arrow/dataset/dataset_internal.h
index 331ad3d..b28bf7a 100644
--- a/cpp/src/arrow/dataset/dataset_internal.h
+++ b/cpp/src/arrow/dataset/dataset_internal.h
@@ -185,5 +185,27 @@ inline bool operator==(const SubtreeImpl::Encoded& l,
const SubtreeImpl::Encoded
l.partition_expression == r.partition_expression;
}
+/// Get fragment scan options of the expected type.
+/// \return Fragment scan options if provided on the scan options, else the
default
+/// options if set, else a default-constructed value. If options are
provided
+/// but of the wrong type, an error is returned.
+template <typename T>
+arrow::Result<std::shared_ptr<T>> GetFragmentScanOptions(
+ const std::string& type_name, ScanOptions* scan_options,
+ const std::shared_ptr<FragmentScanOptions>& default_options) {
+ auto source = default_options;
+ if (scan_options && scan_options->fragment_scan_options) {
+ source = scan_options->fragment_scan_options;
+ }
+ if (!source) {
+ return std::make_shared<T>();
+ }
+ if (source->type_name() != type_name) {
+ return Status::Invalid("FragmentScanOptions of type ", source->type_name(),
+ " were provided for scanning a fragment of type ",
type_name);
+ }
+ return internal::checked_pointer_cast<T>(source);
+}
+
} // namespace dataset
} // namespace arrow
diff --git a/cpp/src/arrow/dataset/file_base.h
b/cpp/src/arrow/dataset/file_base.h
index ca77f43..9c613c0 100644
--- a/cpp/src/arrow/dataset/file_base.h
+++ b/cpp/src/arrow/dataset/file_base.h
@@ -124,6 +124,11 @@ class ARROW_DS_EXPORT FileSource {
/// \brief Base class for file format implementation
class ARROW_DS_EXPORT FileFormat : public
std::enable_shared_from_this<FileFormat> {
public:
+ /// Options affecting how this format is scanned.
+ ///
+ /// The options here can be overridden at scan time.
+ std::shared_ptr<FragmentScanOptions> default_fragment_scan_options;
+
virtual ~FileFormat() = default;
/// \brief The name identifying the kind of file format
diff --git a/cpp/src/arrow/dataset/file_csv.cc
b/cpp/src/arrow/dataset/file_csv.cc
index b7c7f32..e736d06 100644
--- a/cpp/src/arrow/dataset/file_csv.cc
+++ b/cpp/src/arrow/dataset/file_csv.cc
@@ -82,14 +82,11 @@ static inline Result<csv::ConvertOptions> GetConvertOptions(
ARROW_ASSIGN_OR_RAISE(auto column_names,
GetColumnNames(format.parse_options, first_block,
pool));
- auto convert_options = csv::ConvertOptions::Defaults();
- if (scan_options && scan_options->fragment_scan_options &&
- scan_options->fragment_scan_options->type_name() == kCsvTypeName) {
- auto csv_scan_options =
internal::checked_pointer_cast<CsvFragmentScanOptions>(
- scan_options->fragment_scan_options);
- convert_options = csv_scan_options->convert_options;
- }
-
+ ARROW_ASSIGN_OR_RAISE(
+ auto csv_scan_options,
+ GetFragmentScanOptions<CsvFragmentScanOptions>(
+ kCsvTypeName, scan_options.get(),
format.default_fragment_scan_options));
+ auto convert_options = csv_scan_options->convert_options;
for (FieldRef ref : scan_options->MaterializedFields()) {
ARROW_ASSIGN_OR_RAISE(auto field,
ref.GetOne(*scan_options->dataset_schema));
@@ -99,8 +96,13 @@ static inline Result<csv::ConvertOptions> GetConvertOptions(
return convert_options;
}
-static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) {
- auto read_options = csv::ReadOptions::Defaults();
+static inline Result<csv::ReadOptions> GetReadOptions(
+ const CsvFileFormat& format, const std::shared_ptr<ScanOptions>&
scan_options) {
+ ARROW_ASSIGN_OR_RAISE(
+ auto csv_scan_options,
+ GetFragmentScanOptions<CsvFragmentScanOptions>(
+ kCsvTypeName, scan_options.get(),
format.default_fragment_scan_options));
+ auto read_options = csv_scan_options->read_options;
// Multithreaded conversion of individual files would lead to excessive
thread
// contention when ScanTasks are also executed in multiple threads, so we
disable it
// here.
@@ -112,7 +114,7 @@ static inline Result<std::shared_ptr<csv::StreamingReader>>
OpenReader(
const FileSource& source, const CsvFileFormat& format,
const std::shared_ptr<ScanOptions>& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
- auto reader_options = GetReadOptions(format);
+ ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format,
scan_options));
util::string_view first_block;
ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h
index 4c66e29..b235195 100644
--- a/cpp/src/arrow/dataset/file_csv.h
+++ b/cpp/src/arrow/dataset/file_csv.h
@@ -62,11 +62,17 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override { return
NULLPTR; }
};
-class ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
- public:
+/// \brief Per-scan options for CSV fragments
+struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
std::string type_name() const override { return kCsvTypeName; }
+ /// CSV conversion options
csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults();
+
+ /// CSV reading options
+ ///
+ /// Note that use_threads is always ignored.
+ csv::ReadOptions read_options = csv::ReadOptions::Defaults();
};
} // namespace dataset
diff --git a/cpp/src/arrow/dataset/file_csv_test.cc
b/cpp/src/arrow/dataset/file_csv_test.cc
index c3c8796..99ca7cc 100644
--- a/cpp/src/arrow/dataset/file_csv_test.cc
+++ b/cpp/src/arrow/dataset/file_csv_test.cc
@@ -134,6 +134,43 @@ bar)");
ASSERT_EQ(null_count, 1);
}
+TEST_P(TestCsvFileFormat, CustomReadOptions) {
+ auto source = GetFileSource(R"(header_skipped
+str
+foo
+MYNULL
+N/A
+bar)");
+ SetSchema({field("str", utf8())});
+ auto defaults = std::make_shared<CsvFragmentScanOptions>();
+ defaults->read_options.skip_rows = 1;
+ format_->default_fragment_scan_options = defaults;
+ ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+ ASSERT_OK_AND_ASSIGN(auto physical_schema, fragment->ReadPhysicalSchema());
+ AssertSchemaEqual(opts_->dataset_schema, physical_schema);
+
+ {
+ int64_t rows = 0;
+ for (auto maybe_batch : Batches(fragment.get())) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ rows += batch->GetColumnByName("str")->length();
+ }
+ ASSERT_EQ(rows, 4);
+ }
+ {
+ // These options completely override the default ones
+ auto fragment_scan_options = std::make_shared<CsvFragmentScanOptions>();
+ fragment_scan_options->read_options.block_size = 1 << 22;
+ opts_->fragment_scan_options = fragment_scan_options;
+ int64_t rows = 0;
+ for (auto maybe_batch : Batches(fragment.get())) {
+ ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch);
+ rows += batch->GetColumnByName("header_skipped")->length();
+ }
+ ASSERT_EQ(rows, 5);
+ }
+}
+
TEST_P(TestCsvFileFormat, ScanRecordBatchReaderWithVirtualColumn) {
auto source = GetFileSource(R"(f64
1.0
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 1aca9fa..dee96ce 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -149,6 +149,12 @@ Status ScannerBuilder::Pool(MemoryPool* pool) {
return Status::OK();
}
+Status ScannerBuilder::FragmentScanOptions(
+ std::shared_ptr<dataset::FragmentScanOptions> fragment_scan_options) {
+ scan_options_->fragment_scan_options = std::move(fragment_scan_options);
+ return Status::OK();
+}
+
Result<std::shared_ptr<Scanner>> ScannerBuilder::Finish() {
if (!scan_options_->projection.IsBound()) {
RETURN_NOT_OK(Project(scan_options_->dataset_schema->field_names()));
diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h
index 6e06af0..df5f795 100644
--- a/cpp/src/arrow/dataset/scanner.h
+++ b/cpp/src/arrow/dataset/scanner.h
@@ -240,6 +240,9 @@ class ARROW_DS_EXPORT ScannerBuilder {
/// \brief Set the pool from which materialized and scanned arrays will be
allocated.
Status Pool(MemoryPool* pool);
+ /// \brief Set fragment-specific scan options.
+ Status FragmentScanOptions(std::shared_ptr<FragmentScanOptions>
fragment_scan_options);
+
/// \brief Return the constructed now-immutable Scanner object
Result<std::shared_ptr<Scanner>> Finish();
diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h
index 62395ad..d148d4e 100644
--- a/cpp/src/arrow/dataset/type_fwd.h
+++ b/cpp/src/arrow/dataset/type_fwd.h
@@ -46,6 +46,8 @@ class Fragment;
using FragmentIterator = Iterator<std::shared_ptr<Fragment>>;
using FragmentVector = std::vector<std::shared_ptr<Fragment>>;
+class FragmentScanOptions;
+
class FileSource;
class FileFormat;
class FileFragment;
@@ -58,6 +60,7 @@ struct FileSystemDatasetWriteOptions;
class InMemoryDataset;
class CsvFileFormat;
+struct CsvFragmentScanOptions;
class IpcFileFormat;
class IpcFileWriter;
diff --git a/python/pyarrow/_csv.pxd b/python/pyarrow/_csv.pxd
index 2d9d24a..f8e12f1 100644
--- a/python/pyarrow/_csv.pxd
+++ b/python/pyarrow/_csv.pxd
@@ -21,9 +21,26 @@ from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport _Weakrefable
+cdef class ConvertOptions(_Weakrefable):
+ cdef:
+ CCSVConvertOptions options
+
+ @staticmethod
+ cdef ConvertOptions wrap(CCSVConvertOptions options)
+
+
cdef class ParseOptions(_Weakrefable):
cdef:
CCSVParseOptions options
@staticmethod
cdef ParseOptions wrap(CCSVParseOptions options)
+
+
+cdef class ReadOptions(_Weakrefable):
+ cdef:
+ CCSVReadOptions options
+ public object encoding
+
+ @staticmethod
+ cdef ReadOptions wrap(CCSVReadOptions options)
diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx
index cce44d1..a98160c 100644
--- a/python/pyarrow/_csv.pyx
+++ b/python/pyarrow/_csv.pyx
@@ -73,9 +73,6 @@ cdef class ReadOptions(_Weakrefable):
The character encoding of the CSV data. Columns that cannot
decode using this encoding can still be read as Binary.
"""
- cdef:
- CCSVReadOptions options
- public object encoding
# Avoid mistakingly creating attributes
__slots__ = ()
@@ -161,6 +158,40 @@ cdef class ReadOptions(_Weakrefable):
def autogenerate_column_names(self, value):
self.options.autogenerate_column_names = value
+ def equals(self, ReadOptions other):
+ return (
+ self.use_threads == other.use_threads and
+ self.block_size == other.block_size and
+ self.skip_rows == other.skip_rows and
+ self.column_names == other.column_names and
+ self.autogenerate_column_names ==
+ other.autogenerate_column_names and
+ self.encoding == other.encoding
+ )
+
+ @staticmethod
+ cdef ReadOptions wrap(CCSVReadOptions options):
+ out = ReadOptions()
+ out.options = options
+ out.encoding = 'utf8' # No way to know this
+ return out
+
+ def __getstate__(self):
+ return (self.use_threads, self.block_size, self.skip_rows,
+ self.column_names, self.autogenerate_column_names,
+ self.encoding)
+
+ def __setstate__(self, state):
+ (self.use_threads, self.block_size, self.skip_rows,
+ self.column_names, self.autogenerate_column_names,
+ self.encoding) = state
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
cdef class ParseOptions(_Weakrefable):
"""
@@ -320,6 +351,12 @@ cdef class ParseOptions(_Weakrefable):
self.escape_char, self.newlines_in_values,
self.ignore_empty_lines) = state
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
cdef class _ISO8601(_Weakrefable):
"""
@@ -391,9 +428,6 @@ cdef class ConvertOptions(_Weakrefable):
`column_types`, or null by default).
This option is ignored if `include_columns` is empty.
"""
- cdef:
- CCSVConvertOptions options
-
# Avoid mistakingly creating attributes
__slots__ = ()
@@ -603,6 +637,48 @@ cdef class ConvertOptions(_Weakrefable):
self.options.timestamp_parsers = move(c_parsers)
+ @staticmethod
+ cdef ConvertOptions wrap(CCSVConvertOptions options):
+ out = ConvertOptions()
+ out.options = options
+ return out
+
+ def equals(self, ConvertOptions other):
+ return (
+ self.check_utf8 == other.check_utf8 and
+ self.column_types == other.column_types and
+ self.null_values == other.null_values and
+ self.true_values == other.true_values and
+ self.false_values == other.false_values and
+ self.timestamp_parsers == other.timestamp_parsers and
+ self.strings_can_be_null == other.strings_can_be_null and
+ self.auto_dict_encode == other.auto_dict_encode and
+ self.auto_dict_max_cardinality ==
+ other.auto_dict_max_cardinality and
+ self.include_columns == other.include_columns and
+ self.include_missing_columns == other.include_missing_columns
+ )
+
+ def __getstate__(self):
+ return (self.check_utf8, self.column_types, self.null_values,
+ self.true_values, self.false_values, self.timestamp_parsers,
+ self.strings_can_be_null, self.auto_dict_encode,
+ self.auto_dict_max_cardinality, self.include_columns,
+ self.include_missing_columns)
+
+ def __setstate__(self, state):
+ (self.check_utf8, self.column_types, self.null_values,
+ self.true_values, self.false_values, self.timestamp_parsers,
+ self.strings_can_be_null, self.auto_dict_encode,
+ self.auto_dict_max_cardinality, self.include_columns,
+ self.include_missing_columns) = state
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
cdef _get_reader(input_file, ReadOptions read_options,
shared_ptr[CInputStream]* out):
diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index 4f559f2..169c6e1 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -29,7 +29,7 @@ from pyarrow.lib cimport *
from pyarrow.lib import frombytes, tobytes
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow._fs cimport FileSystem, FileInfo, FileSelector
-from pyarrow._csv cimport ParseOptions
+from pyarrow._csv cimport ConvertOptions, ParseOptions, ReadOptions
from pyarrow.util import _is_path_like, _stringify_path
from pyarrow._parquet cimport (
@@ -377,6 +377,9 @@ cdef class Dataset(_Weakrefable):
memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the
default pool.
+ fragment_scan_options : FragmentScanOptions, default None
+ Options specific to a particular scan and fragment type, which
+ can change between different scans of the same dataset.
Returns
-------
@@ -714,6 +717,23 @@ cdef class FileFormat(_Weakrefable):
def default_extname(self):
return frombytes(self.format.type_name())
+ @property
+ def default_fragment_scan_options(self):
+ return FragmentScanOptions.wrap(
+ self.wrapped.get().default_fragment_scan_options)
+
+ @default_fragment_scan_options.setter
+ def default_fragment_scan_options(self, FragmentScanOptions options):
+ if options is None:
+ self.wrapped.get().default_fragment_scan_options =\
+ <shared_ptr[CFragmentScanOptions]>nullptr
+ else:
+ self._set_default_fragment_scan_options(options)
+
+ cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
+ raise ValueError(f"Cannot set fragment scan options for "
+ f"'{options.type_name}' on {self.__class__.__name__}")
+
def __eq__(self, other):
try:
return self.equals(other)
@@ -816,6 +836,9 @@ cdef class Fragment(_Weakrefable):
memory_pool : MemoryPool, default None
For memory allocations, if required. If not specified, uses the
default pool.
+ fragment_scan_options : FragmentScanOptions, default None
+ Options specific to a particular scan and fragment type, which
+ can change between different scans of the same dataset.
Returns
-------
@@ -966,6 +989,45 @@ class RowGroupInfo:
return self.id == other.id
+cdef class FragmentScanOptions(_Weakrefable):
+ """Scan options specific to a particular fragment and scan operation."""
+
+ cdef:
+ shared_ptr[CFragmentScanOptions] wrapped
+
+ def __init__(self):
+ _forbid_instantiation(self.__class__)
+
+ cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
+ self.wrapped = sp
+
+ @staticmethod
+ cdef wrap(const shared_ptr[CFragmentScanOptions]& sp):
+ type_name = frombytes(sp.get().type_name())
+
+ classes = {
+ 'csv': CsvFragmentScanOptions,
+ }
+
+ class_ = classes.get(type_name, None)
+ if class_ is None:
+ raise TypeError(type_name)
+
+ cdef FragmentScanOptions self = class_.__new__(class_)
+ self.init(sp)
+ return self
+
+ @property
+ def type_name(self):
+ return frombytes(self.wrapped.get().type_name())
+
+ def __eq__(self, other):
+ try:
+ return self.equals(other)
+ except TypeError:
+ return False
+
+
cdef class ParquetFileFragment(FileFragment):
"""A Fragment representing a parquet file."""
@@ -1363,10 +1425,18 @@ cdef class CsvFileFormat(FileFormat):
cdef:
CCsvFileFormat* csv_format
- def __init__(self, ParseOptions parse_options=None):
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, ParseOptions parse_options=None,
+ ConvertOptions convert_options=None,
+ ReadOptions read_options=None):
self.init(shared_ptr[CFileFormat](new CCsvFileFormat()))
if parse_options is not None:
self.parse_options = parse_options
+ if convert_options is not None or read_options is not None:
+ self.default_fragment_scan_options = CsvFragmentScanOptions(
+ convert_options=convert_options, read_options=read_options)
cdef void init(self, const shared_ptr[CFileFormat]& sp):
FileFormat.init(self, sp)
@@ -1383,12 +1453,68 @@ cdef class CsvFileFormat(FileFormat):
def parse_options(self, ParseOptions parse_options not None):
self.csv_format.parse_options = parse_options.options
+ cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
+ if options.type_name == 'csv':
+ self.csv_format.default_fragment_scan_options = options.wrapped
+ else:
+ super()._set_default_fragment_scan_options(options)
+
def equals(self, CsvFileFormat other):
return self.parse_options.equals(other.parse_options)
def __reduce__(self):
return CsvFileFormat, (self.parse_options,)
+ def __repr__(self):
+ return f"<CsvFileFormat parse_options={self.parse_options}>"
+
+
+cdef class CsvFragmentScanOptions(FragmentScanOptions):
+ """Scan-specific options for CSV fragments."""
+
+ cdef:
+ CCsvFragmentScanOptions* csv_options
+
+ # Avoid mistakingly creating attributes
+ __slots__ = ()
+
+ def __init__(self, ConvertOptions convert_options=None,
+ ReadOptions read_options=None):
+ self.init(shared_ptr[CFragmentScanOptions](
+ new CCsvFragmentScanOptions()))
+ if convert_options is not None:
+ self.convert_options = convert_options
+ if read_options is not None:
+ self.read_options = read_options
+
+ cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
+ FragmentScanOptions.init(self, sp)
+ self.csv_options = <CCsvFragmentScanOptions*> sp.get()
+
+ @property
+ def convert_options(self):
+ return ConvertOptions.wrap(self.csv_options.convert_options)
+
+ @convert_options.setter
+ def convert_options(self, ConvertOptions convert_options not None):
+ self.csv_options.convert_options = convert_options.options
+
+ @property
+ def read_options(self):
+ return ReadOptions.wrap(self.csv_options.read_options)
+
+ @read_options.setter
+ def read_options(self, ReadOptions read_options not None):
+ self.csv_options.read_options = read_options.options
+
+ def equals(self, CsvFragmentScanOptions other):
+ return (self.convert_options.equals(other.convert_options) and
+ self.read_options.equals(other.read_options))
+
+ def __reduce__(self):
+ return CsvFragmentScanOptions, (self.convert_options,
+ self.read_options)
+
cdef class Partitioning(_Weakrefable):
@@ -2192,7 +2318,9 @@ cdef void _populate_builder(const
shared_ptr[CScannerBuilder]& ptr,
list columns=None, Expression filter=None,
int batch_size=_DEFAULT_BATCH_SIZE,
bint use_threads=True,
- MemoryPool memory_pool=None) except *:
+ MemoryPool memory_pool=None,
+ FragmentScanOptions fragment_scan_options=None)\
+ except *:
cdef:
CScannerBuilder *builder
builder = ptr.get()
@@ -2207,6 +2335,9 @@ cdef void _populate_builder(const
shared_ptr[CScannerBuilder]& ptr,
check_status(builder.UseThreads(use_threads))
if memory_pool:
check_status(builder.Pool(maybe_unbox_memory_pool(memory_pool)))
+ if fragment_scan_options:
+ check_status(
+ builder.FragmentScanOptions(fragment_scan_options.wrapped))
cdef class Scanner(_Weakrefable):
@@ -2269,7 +2400,8 @@ cdef class Scanner(_Weakrefable):
def from_dataset(Dataset dataset not None,
bint use_threads=True, MemoryPool memory_pool=None,
list columns=None, Expression filter=None,
- int batch_size=_DEFAULT_BATCH_SIZE):
+ int batch_size=_DEFAULT_BATCH_SIZE,
+ FragmentScanOptions fragment_scan_options=None):
cdef:
shared_ptr[CScanOptions] options = make_shared[CScanOptions]()
shared_ptr[CScannerBuilder] builder
@@ -2278,7 +2410,8 @@ cdef class Scanner(_Weakrefable):
builder = make_shared[CScannerBuilder](dataset.unwrap(), options)
_populate_builder(builder, columns=columns, filter=filter,
batch_size=batch_size, use_threads=use_threads,
- memory_pool=memory_pool)
+ memory_pool=memory_pool,
+ fragment_scan_options=fragment_scan_options)
scanner = GetResultValue(builder.get().Finish())
return Scanner.wrap(scanner)
@@ -2287,7 +2420,8 @@ cdef class Scanner(_Weakrefable):
def from_fragment(Fragment fragment not None, Schema schema=None,
bint use_threads=True, MemoryPool memory_pool=None,
list columns=None, Expression filter=None,
- int batch_size=_DEFAULT_BATCH_SIZE):
+ int batch_size=_DEFAULT_BATCH_SIZE,
+ FragmentScanOptions fragment_scan_options=None):
cdef:
shared_ptr[CScanOptions] options = make_shared[CScanOptions]()
shared_ptr[CScannerBuilder] builder
@@ -2299,7 +2433,8 @@ cdef class Scanner(_Weakrefable):
fragment.unwrap(), options)
_populate_builder(builder, columns=columns, filter=filter,
batch_size=batch_size, use_threads=use_threads,
- memory_pool=memory_pool)
+ memory_pool=memory_pool,
+ fragment_scan_options=fragment_scan_options)
scanner = GetResultValue(builder.get().Finish())
return Scanner.wrap(scanner)
diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py
index a2cb87a..195d414 100644
--- a/python/pyarrow/dataset.py
+++ b/python/pyarrow/dataset.py
@@ -22,6 +22,7 @@ from pyarrow.util import _stringify_path, _is_path_like
from pyarrow._dataset import ( # noqa
CsvFileFormat,
+ CsvFragmentScanOptions,
Expression,
Dataset,
DatasetFactory,
diff --git a/python/pyarrow/includes/libarrow_dataset.pxd
b/python/pyarrow/includes/libarrow_dataset.pxd
index f37f49f..309d353 100644
--- a/python/pyarrow/includes/libarrow_dataset.pxd
+++ b/python/pyarrow/includes/libarrow_dataset.pxd
@@ -59,6 +59,9 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
@staticmethod
shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema)
+ cdef cppclass CFragmentScanOptions "arrow::dataset::FragmentScanOptions":
+ c_string type_name() const
+
ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \
"arrow::dataset::ScanTaskIterator"
@@ -101,6 +104,8 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
CStatus UseThreads(c_bool use_threads)
CStatus Pool(CMemoryPool* pool)
CStatus BatchSize(int64_t batch_size)
+ CStatus FragmentScanOptions(
+ shared_ptr[CFragmentScanOptions] fragment_scan_options)
CResult[shared_ptr[CScanner]] Finish()
shared_ptr[CSchema] schema() const
@@ -164,6 +169,7 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
c_string type_name() const
cdef cppclass CFileFormat "arrow::dataset::FileFormat":
+ shared_ptr[CFragmentScanOptions] default_fragment_scan_options
c_string type_name() const
CResult[shared_ptr[CSchema]] Inspect(const CFileSource&) const
CResult[shared_ptr[CFileFragment]] MakeFragment(
@@ -252,6 +258,11 @@ cdef extern from "arrow/dataset/api.h" namespace
"arrow::dataset" nogil:
CFileFormat):
CCSVParseOptions parse_options
+ cdef cppclass CCsvFragmentScanOptions \
+ "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions):
+ CCSVConvertOptions convert_options
+ CCSVReadOptions read_options
+
cdef cppclass CPartitioning "arrow::dataset::Partitioning":
c_string type_name() const
CResult[CExpression] Parse(const c_string & path) const
diff --git a/python/pyarrow/tests/test_dataset.py
b/python/pyarrow/tests/test_dataset.py
index e347008..8e6bf9c 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -526,6 +526,10 @@ def test_file_format_pickling():
ds.CsvFileFormat(),
ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t',
ignore_empty_lines=True)),
+ ds.CsvFileFormat(read_options=pa.csv.ReadOptions(
+ skip_rows=3, column_names=['foo'])),
+ ds.CsvFileFormat(read_options=pa.csv.ReadOptions(
+ skip_rows=3, block_size=2**20)),
ds.ParquetFileFormat(),
ds.ParquetFileFormat(
read_options=ds.ParquetReadOptions(use_buffered_stream=True)
@@ -541,6 +545,18 @@ def test_file_format_pickling():
assert pickle.loads(pickle.dumps(file_format)) == file_format
+def test_fragment_scan_options_pickling():
+ options = [
+ ds.CsvFragmentScanOptions(),
+ ds.CsvFragmentScanOptions(
+ convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)),
+ ds.CsvFragmentScanOptions(
+ read_options=pa.csv.ReadOptions(block_size=2**16)),
+ ]
+ for option in options:
+ assert pickle.loads(pickle.dumps(option)) == option
+
+
@pytest.mark.parametrize('paths_or_selector', [
fs.FileSelector('subdir', recursive=True),
[
@@ -2242,6 +2258,51 @@ def test_csv_format_compressed(tempdir, compression):
assert result.equals(table)
+def test_csv_format_options(tempdir):
+ path = str(tempdir / 'test.csv')
+ with open(path, 'w') as sink:
+ sink.write('skipped\ncol0\nfoo\nbar\n')
+ dataset = ds.dataset(path, format='csv')
+ result = dataset.to_table()
+ assert result.equals(
+ pa.table({'skipped': pa.array(['col0', 'foo', 'bar'])}))
+
+ dataset = ds.dataset(path, format=ds.CsvFileFormat(
+ read_options=pa.csv.ReadOptions(skip_rows=1)))
+ result = dataset.to_table()
+ assert result.equals(pa.table({'col0': pa.array(['foo', 'bar'])}))
+
+ dataset = ds.dataset(path, format=ds.CsvFileFormat(
+ read_options=pa.csv.ReadOptions(column_names=['foo'])))
+ result = dataset.to_table()
+ assert result.equals(
+ pa.table({'foo': pa.array(['skipped', 'col0', 'foo', 'bar'])}))
+
+
+def test_csv_fragment_options(tempdir):
+ path = str(tempdir / 'test.csv')
+ with open(path, 'w') as sink:
+ sink.write('col0\nfoo\nspam\nMYNULL\n')
+ dataset = ds.dataset(path, format='csv')
+ convert_options = pyarrow.csv.ConvertOptions(null_values=['MYNULL'],
+ strings_can_be_null=True)
+ options = ds.CsvFragmentScanOptions(
+ convert_options=convert_options,
+ read_options=pa.csv.ReadOptions(block_size=2**16))
+ result = dataset.to_table(fragment_scan_options=options)
+ assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])}))
+
+ csv_format = ds.CsvFileFormat(convert_options=convert_options)
+ dataset = ds.dataset(path, format=csv_format)
+ result = dataset.to_table()
+ assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])}))
+
+ options = ds.CsvFragmentScanOptions()
+ result = dataset.to_table(fragment_scan_options=options)
+ assert result.equals(
+ pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])}))
+
+
def test_feather_format(tempdir):
from pyarrow.feather import write_feather
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 0d0d3d3..2c13537 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -428,8 +428,16 @@ dataset___IpcFileFormat__Make <- function(){
.Call(`_arrow_dataset___IpcFileFormat__Make`)
}
-dataset___CsvFileFormat__Make <- function(parse_options){
- .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options)
+dataset___CsvFileFormat__Make <- function(parse_options, convert_options,
read_options){
+ .Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options,
convert_options, read_options)
+}
+
+dataset___FragmentScanOptions__type_name <- function(fragment_scan_options){
+ .Call(`_arrow_dataset___FragmentScanOptions__type_name`,
fragment_scan_options)
+}
+
+dataset___CsvFragmentScanOptions__Make <- function(convert_options,
read_options){
+ .Call(`_arrow_dataset___CsvFragmentScanOptions__Make`, convert_options,
read_options)
}
dataset___DirectoryPartitioning <- function(schm){
@@ -468,6 +476,10 @@ dataset___ScannerBuilder__BatchSize <- function(sb,
batch_size){
invisible(.Call(`_arrow_dataset___ScannerBuilder__BatchSize`, sb,
batch_size))
}
+dataset___ScannerBuilder__FragmentScanOptions <- function(sb, options){
+ invisible(.Call(`_arrow_dataset___ScannerBuilder__FragmentScanOptions`,
sb, options))
+}
+
dataset___ScannerBuilder__schema <- function(sb){
.Call(`_arrow_dataset___ScannerBuilder__schema`, sb)
}
diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R
index f1bf601..cd54a30 100644
--- a/r/R/dataset-format.R
+++ b/r/R/dataset-format.R
@@ -40,11 +40,18 @@
#' * `buffer_size`: Size of buffered stream, if enabled. Default is 8KB.
#' * `dict_columns`: Names of columns which should be read as dictionaries.
#'
-#' `format = "text"`: see [CsvReadOptions]. Note that you can specify them
either
+#' `format = "text"`: see [CsvParseOptions]. Note that you can specify them
either
#' with the Arrow C++ library naming ("delimiter", "quoting", etc.) or the
#' `readr`-style naming used in [read_csv_arrow()] ("delim", "quote", etc.).
#' Not all `readr` options are currently supported; please file an issue if
-#' you encounter one that `arrow` should support.
+#' you encounter one that `arrow` should support. Also, the following
options are
+#' supported. From [CsvReadOptions]:
+#' * `skip_rows`
+#' * `column_names`
+#' * `autogenerate_column_names`
+#' From [CsvFragmentScanOptions] (these values can be overridden at scan
time):
+#' * `convert_options`: a [CsvConvertOptions]
+#' * `block_size`
#'
#' It returns the appropriate subclass of `FileFormat` (e.g.
`ParquetFileFormat`)
#' @rdname FileFormat
@@ -101,13 +108,21 @@ IpcFileFormat <- R6Class("IpcFileFormat", inherit =
FileFormat)
#' @rdname FileFormat
#' @export
CsvFileFormat <- R6Class("CsvFileFormat", inherit = FileFormat)
-CsvFileFormat$create <- function(..., opts =
csv_file_format_parse_options(...)) {
- dataset___CsvFileFormat__Make(opts)
+CsvFileFormat$create <- function(..., opts =
csv_file_format_parse_options(...),
+ convert_options =
csv_file_format_convert_options(...),
+ read_options =
csv_file_format_read_options(...)) {
+ dataset___CsvFileFormat__Make(opts, convert_options, read_options)
}
# Support both readr-style option names and Arrow C++ option names
csv_file_format_parse_options <- function(...) {
- opt_names <- names(list(...))
+ opts <- list(...)
+ # Filter out arguments meant for CsvConvertOptions/CsvReadOptions
+ convert_opts <- names(formals(CsvConvertOptions$create))
+ read_opts <- names(formals(CsvReadOptions$create))
+ opts[convert_opts] <- NULL
+ opts[read_opts] <- NULL
+ opt_names <- names(opts)
# Catch any readr-style options specified with full option names that are
# supported by read_delim_arrow() (and its wrappers) but are not yet
# supported here
@@ -163,12 +178,89 @@ csv_file_format_parse_options <- function(...) {
stop("Use either Arrow parse options or readr parse options, not both",
call. = FALSE)
}
- readr_to_csv_parse_options(...) # all options have readr-style names
+ do.call(readr_to_csv_parse_options, opts) # all options have readr-style
names
+ } else {
+ do.call(CsvParseOptions$create, opts) # all options have Arrow C++ names
+ }
+}
+
+csv_file_format_convert_options <- function(...) {
+ opts <- list(...)
+ # Filter out arguments meant for CsvParseOptions/CsvReadOptions
+ arrow_opts <- names(formals(CsvParseOptions$create))
+ readr_opts <- names(formals(readr_to_csv_parse_options))
+ read_opts <- names(formals(CsvReadOptions$create))
+ opts[arrow_opts] <- NULL
+ opts[readr_opts] <- NULL
+ opts[read_opts] <- NULL
+ do.call(CsvConvertOptions$create, opts)
+}
+
+csv_file_format_read_options <- function(...) {
+ opts <- list(...)
+ # Filter out arguments meant for CsvParseOptions/CsvConvertOptions
+ arrow_opts <- names(formals(CsvParseOptions$create))
+ readr_opts <- names(formals(readr_to_csv_parse_options))
+ convert_opts <- names(formals(CsvConvertOptions$create))
+ opts[arrow_opts] <- NULL
+ opts[readr_opts] <- NULL
+ opts[convert_opts] <- NULL
+ do.call(CsvReadOptions$create, opts)
+}
+
+#' Format-specific scan options
+#'
+#' @description
+#' A `FragmentScanOptions` holds options specific to a `FileFormat` and a scan
+#' operation.
+#'
+#' @section Factory:
+#' `FragmentScanOptions$create()` takes the following arguments:
+#' * `format`: A string identifier of the file format. Currently supported
values:
+#' * "csv"/"text", aliases for the same format.
+#' * `...`: Additional format-specific options
+#'
+#' `format = "text"`: see [CsvConvertOptions]. Note that options can only be
+#' specified with the Arrow C++ library naming. Also, "block_size" from
+#' [CsvReadOptions] may be given.
+#'
+#' It returns the appropriate subclass of `FragmentScanOptions`
+#' (e.g. `CsvFragmentScanOptions`).
+#' @rdname FragmentScanOptions
+#' @name FragmentScanOptions
+#' @export
+FragmentScanOptions <- R6Class("FragmentScanOptions", inherit = ArrowObject,
+ active = list(
+ # @description
+ # Return the `FragmentScanOptions`'s type
+ type = function() dataset___FragmentScanOptions__type_name(self)
+ )
+)
+FragmentScanOptions$create <- function(format, ...) {
+ opt_names <- names(list(...))
+ if (format %in% c("csv", "text", "tsv")) {
+ CsvFragmentScanOptions$create(...)
} else {
- CsvParseOptions$create(...) # all options have Arrow C++ names
+ stop("Unsupported file format: ", format, call. = FALSE)
}
}
+#' @export
+as.character.FragmentScanOptions <- function(x, ...) {
+ x$type
+}
+
+#' @usage NULL
+#' @format NULL
+#' @rdname FragmentScanOptions
+#' @export
+CsvFragmentScanOptions <- R6Class("CsvFragmentScanOptions", inherit =
FragmentScanOptions)
+CsvFragmentScanOptions$create <- function(...,
+ convert_opts =
csv_file_format_convert_options(...),
+ read_opts =
csv_file_format_read_options(...)) {
+ dataset___CsvFragmentScanOptions__Make(convert_opts, read_opts)
+}
+
#' Format-specific write options
#'
#' @description
diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R
index 1c71bf4..7e14886 100644
--- a/r/R/dataset-scan.R
+++ b/r/R/dataset-scan.R
@@ -67,6 +67,7 @@ Scanner$create <- function(dataset,
filter = TRUE,
use_threads = option_use_threads(),
batch_size = NULL,
+ fragment_scan_options = NULL,
...) {
if (inherits(dataset, "arrow_dplyr_query")) {
if (inherits(dataset$.data, "ArrowTabular")) {
@@ -78,6 +79,8 @@ Scanner$create <- function(dataset,
dataset$selected_columns,
dataset$filtered_rows,
use_threads,
+ batch_size,
+ fragment_scan_options,
...
))
}
@@ -99,6 +102,9 @@ Scanner$create <- function(dataset,
if (is_integerish(batch_size)) {
scanner_builder$BatchSize(batch_size)
}
+ if (!is.null(fragment_scan_options)) {
+ scanner_builder$FragmentScanOptions(fragment_scan_options)
+ }
scanner_builder$Finish()
}
@@ -185,6 +191,10 @@ ScannerBuilder <- R6Class("ScannerBuilder", inherit =
ArrowObject,
dataset___ScannerBuilder__BatchSize(self, batch_size)
self
},
+ FragmentScanOptions = function(options) {
+ dataset___ScannerBuilder__FragmentScanOptions(self, options)
+ self
+ },
Finish = function() dataset___ScannerBuilder__Finish(self)
),
active = list(
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 7229e60..697451d 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1105,21 +1105,54 @@ extern "C" SEXP _arrow_dataset___IpcFileFormat__Make(){
// dataset.cpp
#if defined(ARROW_R_WITH_DATASET)
-std::shared_ptr<ds::CsvFileFormat> dataset___CsvFileFormat__Make(const
std::shared_ptr<arrow::csv::ParseOptions>& parse_options);
-extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp){
+std::shared_ptr<ds::CsvFileFormat> dataset___CsvFileFormat__Make(const
std::shared_ptr<arrow::csv::ParseOptions>& parse_options, const
std::shared_ptr<arrow::csv::ConvertOptions>& convert_options, const
std::shared_ptr<arrow::csv::ReadOptions>& read_options);
+extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp,
SEXP convert_options_sexp, SEXP read_options_sexp){
BEGIN_CPP11
arrow::r::Input<const std::shared_ptr<arrow::csv::ParseOptions>&>::type
parse_options(parse_options_sexp);
- return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options));
+ arrow::r::Input<const
std::shared_ptr<arrow::csv::ConvertOptions>&>::type
convert_options(convert_options_sexp);
+ arrow::r::Input<const std::shared_ptr<arrow::csv::ReadOptions>&>::type
read_options(read_options_sexp);
+ return cpp11::as_sexp(dataset___CsvFileFormat__Make(parse_options,
convert_options, read_options));
END_CPP11
}
#else
-extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp){
+extern "C" SEXP _arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp,
SEXP convert_options_sexp, SEXP read_options_sexp){
Rf_error("Cannot call dataset___CsvFileFormat__Make(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
}
#endif
// dataset.cpp
#if defined(ARROW_R_WITH_DATASET)
+std::string dataset___FragmentScanOptions__type_name(const
std::shared_ptr<ds::FragmentScanOptions>& fragment_scan_options);
+extern "C" SEXP _arrow_dataset___FragmentScanOptions__type_name(SEXP
fragment_scan_options_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<ds::FragmentScanOptions>&>::type
fragment_scan_options(fragment_scan_options_sexp);
+ return
cpp11::as_sexp(dataset___FragmentScanOptions__type_name(fragment_scan_options));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___FragmentScanOptions__type_name(SEXP
fragment_scan_options_sexp){
+ Rf_error("Cannot call dataset___FragmentScanOptions__type_name(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_DATASET)
+std::shared_ptr<ds::CsvFragmentScanOptions>
dataset___CsvFragmentScanOptions__Make(const
std::shared_ptr<arrow::csv::ConvertOptions>& convert_options, const
std::shared_ptr<arrow::csv::ReadOptions>& read_options);
+extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP
convert_options_sexp, SEXP read_options_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const
std::shared_ptr<arrow::csv::ConvertOptions>&>::type
convert_options(convert_options_sexp);
+ arrow::r::Input<const std::shared_ptr<arrow::csv::ReadOptions>&>::type
read_options(read_options_sexp);
+ return
cpp11::as_sexp(dataset___CsvFragmentScanOptions__Make(convert_options,
read_options));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___CsvFragmentScanOptions__Make(SEXP
convert_options_sexp, SEXP read_options_sexp){
+ Rf_error("Cannot call dataset___CsvFragmentScanOptions__Make(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_DATASET)
std::shared_ptr<ds::DirectoryPartitioning>
dataset___DirectoryPartitioning(const std::shared_ptr<arrow::Schema>& schm);
extern "C" SEXP _arrow_dataset___DirectoryPartitioning(SEXP schm_sexp){
BEGIN_CPP11
@@ -1267,6 +1300,23 @@ extern "C" SEXP
_arrow_dataset___ScannerBuilder__BatchSize(SEXP sb_sexp, SEXP ba
// dataset.cpp
#if defined(ARROW_R_WITH_DATASET)
+void dataset___ScannerBuilder__FragmentScanOptions(const
std::shared_ptr<ds::ScannerBuilder>& sb, const
std::shared_ptr<ds::FragmentScanOptions>& options);
+extern "C" SEXP _arrow_dataset___ScannerBuilder__FragmentScanOptions(SEXP
sb_sexp, SEXP options_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<ds::ScannerBuilder>&>::type
sb(sb_sexp);
+ arrow::r::Input<const std::shared_ptr<ds::FragmentScanOptions>&>::type
options(options_sexp);
+ dataset___ScannerBuilder__FragmentScanOptions(sb, options);
+ return R_NilValue;
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___ScannerBuilder__FragmentScanOptions(SEXP
sb_sexp, SEXP options_sexp){
+ Rf_error("Cannot call dataset___ScannerBuilder__FragmentScanOptions().
See https://arrow.apache.org/docs/r/articles/install.html for help installing
Arrow C++ libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_DATASET)
std::shared_ptr<arrow::Schema> dataset___ScannerBuilder__schema(const
std::shared_ptr<ds::ScannerBuilder>& sb);
extern "C" SEXP _arrow_dataset___ScannerBuilder__schema(SEXP sb_sexp){
BEGIN_CPP11
@@ -4222,7 +4272,9 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC)
&_arrow_dataset___IpcFileWriteOptions__update2, 4},
{ "_arrow_dataset___IpcFileWriteOptions__update1", (DL_FUNC)
&_arrow_dataset___IpcFileWriteOptions__update1, 3},
{ "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC)
&_arrow_dataset___IpcFileFormat__Make, 0},
- { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC)
&_arrow_dataset___CsvFileFormat__Make, 1},
+ { "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC)
&_arrow_dataset___CsvFileFormat__Make, 3},
+ { "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC)
&_arrow_dataset___FragmentScanOptions__type_name, 1},
+ { "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC)
&_arrow_dataset___CsvFragmentScanOptions__Make, 2},
{ "_arrow_dataset___DirectoryPartitioning", (DL_FUNC)
&_arrow_dataset___DirectoryPartitioning, 1},
{ "_arrow_dataset___DirectoryPartitioning__MakeFactory",
(DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 1},
{ "_arrow_dataset___HivePartitioning", (DL_FUNC)
&_arrow_dataset___HivePartitioning, 2},
@@ -4232,6 +4284,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_dataset___ScannerBuilder__Filter", (DL_FUNC)
&_arrow_dataset___ScannerBuilder__Filter, 2},
{ "_arrow_dataset___ScannerBuilder__UseThreads", (DL_FUNC)
&_arrow_dataset___ScannerBuilder__UseThreads, 2},
{ "_arrow_dataset___ScannerBuilder__BatchSize", (DL_FUNC)
&_arrow_dataset___ScannerBuilder__BatchSize, 2},
+ { "_arrow_dataset___ScannerBuilder__FragmentScanOptions",
(DL_FUNC) &_arrow_dataset___ScannerBuilder__FragmentScanOptions, 2},
{ "_arrow_dataset___ScannerBuilder__schema", (DL_FUNC)
&_arrow_dataset___ScannerBuilder__schema, 1},
{ "_arrow_dataset___ScannerBuilder__Finish", (DL_FUNC)
&_arrow_dataset___ScannerBuilder__Finish, 1},
{ "_arrow_dataset___Scanner__ToTable", (DL_FUNC)
&_arrow_dataset___Scanner__ToTable, 1},
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index 83c7cbb..89c3e4d 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -272,12 +272,36 @@ std::shared_ptr<ds::IpcFileFormat>
dataset___IpcFileFormat__Make() {
// [[dataset::export]]
std::shared_ptr<ds::CsvFileFormat> dataset___CsvFileFormat__Make(
- const std::shared_ptr<arrow::csv::ParseOptions>& parse_options) {
+ const std::shared_ptr<arrow::csv::ParseOptions>& parse_options,
+ const std::shared_ptr<arrow::csv::ConvertOptions>& convert_options,
+ const std::shared_ptr<arrow::csv::ReadOptions>& read_options) {
auto format = std::make_shared<ds::CsvFileFormat>();
format->parse_options = *parse_options;
+ auto scan_options = std::make_shared<ds::CsvFragmentScanOptions>();
+ if (convert_options) scan_options->convert_options = *convert_options;
+ if (read_options) scan_options->read_options = *read_options;
+ format->default_fragment_scan_options = std::move(scan_options);
return format;
}
+// FragmentScanOptions, CsvFragmentScanOptions
+
+// [[dataset::export]]
+std::string dataset___FragmentScanOptions__type_name(
+ const std::shared_ptr<ds::FragmentScanOptions>& fragment_scan_options) {
+ return fragment_scan_options->type_name();
+}
+
+// [[dataset::export]]
+std::shared_ptr<ds::CsvFragmentScanOptions>
dataset___CsvFragmentScanOptions__Make(
+ const std::shared_ptr<arrow::csv::ConvertOptions>& convert_options,
+ const std::shared_ptr<arrow::csv::ReadOptions>& read_options) {
+ auto options = std::make_shared<ds::CsvFragmentScanOptions>();
+ options->convert_options = *convert_options;
+ options->read_options = *read_options;
+ return options;
+}
+
// DirectoryPartitioning, HivePartitioning
// [[dataset::export]]
@@ -347,6 +371,13 @@ void dataset___ScannerBuilder__BatchSize(const
std::shared_ptr<ds::ScannerBuilde
}
// [[dataset::export]]
+void dataset___ScannerBuilder__FragmentScanOptions(
+ const std::shared_ptr<ds::ScannerBuilder>& sb,
+ const std::shared_ptr<ds::FragmentScanOptions>& options) {
+ StopIfNotOk(sb->FragmentScanOptions(options));
+}
+
+// [[dataset::export]]
std::shared_ptr<arrow::Schema> dataset___ScannerBuilder__schema(
const std::shared_ptr<ds::ScannerBuilder>& sb) {
return sb->schema();
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index 67fd500..e6db0bc 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -295,6 +295,45 @@ test_that("CSV dataset", {
)
})
+test_that("CSV scan options", {
+ options <- FragmentScanOptions$create("text")
+ expect_equal(options$type, "csv")
+ options <- FragmentScanOptions$create("csv",
+ null_values = c("mynull"),
+ strings_can_be_null = TRUE)
+ expect_equal(options$type, "csv")
+
+ dst_dir <- make_temp_dir()
+ dst_file <- file.path(dst_dir, "data.csv")
+ df <- tibble(chr = c("foo", "mynull"))
+ write.csv(df, dst_file, row.names = FALSE, quote = FALSE)
+
+ ds <- open_dataset(dst_dir, format = "csv")
+ expect_equivalent(ds %>% collect(), df)
+
+ sb <- ds$NewScan()
+ sb$FragmentScanOptions(options)
+
+ tab <- sb$Finish()$ToTable()
+ expect_equivalent(as.data.frame(tab), tibble(chr = c("foo", NA)))
+
+ # Set default convert options in CsvFileFormat
+ csv_format <- CsvFileFormat$create(null_values = c("mynull"),
+ strings_can_be_null = TRUE)
+ ds <- open_dataset(dst_dir, format = csv_format)
+ expect_equivalent(ds %>% collect(), tibble(chr = c("foo", NA)))
+
+ # Set both parse and convert options
+ df <- tibble(chr = c("foo", "mynull"), chr2 = c("bar", "baz"))
+ write.table(df, dst_file, row.names = FALSE, quote = FALSE, sep = "\t")
+ ds <- open_dataset(dst_dir, format = "csv",
+ delimiter="\t",
+ null_values = c("mynull"),
+ strings_can_be_null = TRUE)
+ expect_equivalent(ds %>% collect(), tibble(chr = c("foo", NA),
+ chr2 = c("bar", "baz")))
+})
+
test_that("compressed CSV dataset", {
skip_if_not_available("gzip")
dst_dir <- make_temp_dir()
@@ -318,6 +357,33 @@ test_that("compressed CSV dataset", {
)
})
+test_that("CSV dataset options", {
+ dst_dir <- make_temp_dir()
+ dst_file <- file.path(dst_dir, "data.csv")
+ df <- tibble(chr = letters[1:10])
+ write.csv(df, dst_file, row.names = FALSE, quote = FALSE)
+
+ format <- FileFormat$create("csv", skip_rows = 1)
+ ds <- open_dataset(dst_dir, format = format)
+
+ expect_equivalent(
+ ds %>%
+ select(string = a) %>%
+ collect(),
+ df1[-1,] %>%
+ select(string = chr)
+ )
+
+ ds <- open_dataset(dst_dir, format = "csv", column_names = c("foo"))
+
+ expect_equivalent(
+ ds %>%
+ select(string = foo) %>%
+ collect(),
+ tibble(foo = c(c('chr'), letters[1:10]))
+ )
+})
+
test_that("Other text delimited dataset", {
ds1 <- open_dataset(tsv_dir, partitioning = "part", format = "tsv")
expect_equivalent(