jorisvandenbossche commented on a change in pull request #8305:
URL: https://github.com/apache/arrow/pull/8305#discussion_r501121145
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1054,18 +1095,173 @@ cdef class ParquetReadOptions(_Weakrefable):
return False
+cdef class ParquetFileWriteOptions(FileWriteOptions):
+
+ cdef:
+ CParquetFileWriteOptions* parquet_options
+ object _properties
+
+ def update(self, **kwargs):
+ cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+ arrow_fields = {
+ "use_deprecated_int96_timestamps",
+ "coerce_timestamps",
+ "allow_truncated_timestamps",
+ }
+
+ update = False
+ update_arrow = False
+ for name, value in kwargs.items():
+ assert name in self._properties
+ self._properties[name] = value
+ if name in arrow_fields:
+ update_arrow = True
+ else:
+ update = True
+
+ if update:
+ opts.writer_properties = _create_writer_properties(
+ use_dictionary=self.use_dictionary,
+ compression=self.compression,
+ version=self.version,
+ write_statistics=self.write_statistics,
+ data_page_size=self.data_page_size,
+ compression_level=self.compression_level,
+ use_byte_stream_split=self.use_byte_stream_split,
+ data_page_version=self.data_page_version,
+ )
+
+ if update_arrow:
+ opts.arrow_writer_properties = _create_arrow_writer_properties(
+ use_deprecated_int96_timestamps=(
+ self.use_deprecated_int96_timestamps
+ ),
+ coerce_timestamps=self.coerce_timestamps,
+ allow_truncated_timestamps=self.allow_truncated_timestamps,
+ writer_engine_version=self.writer_engine_version,
+ )
+
+ @property
+ def use_dictionary(self):
+ return self._properties['use_dictionary']
+
+ @use_dictionary.setter
+ def use_dictionary(self, use_dictionary):
+ self.update(use_dictionary=use_dictionary)
Review comment:
Are those getters/setters needed for the implementation, or only added
for user convenience?
But since we don't test it specifically, I would maybe rather leave it out
for now (to limit the API surface that we need to maintain, can always add this
if there is user demand for it)?
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1054,18 +1095,173 @@ cdef class ParquetReadOptions(_Weakrefable):
return False
+cdef class ParquetFileWriteOptions(FileWriteOptions):
+
+ cdef:
+ CParquetFileWriteOptions* parquet_options
+ object _properties
+
+ def update(self, **kwargs):
+ cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+ arrow_fields = {
+ "use_deprecated_int96_timestamps",
+ "coerce_timestamps",
+ "allow_truncated_timestamps",
+ }
+
+ update = False
+ update_arrow = False
+ for name, value in kwargs.items():
+ assert name in self._properties
Review comment:
Instead of an assert, maybe raise a TypeError that such keyword is not
supported?
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1054,18 +1095,173 @@ cdef class ParquetReadOptions(_Weakrefable):
return False
+cdef class ParquetFileWriteOptions(FileWriteOptions):
+
+ cdef:
+ CParquetFileWriteOptions* parquet_options
+ object _properties
+
+ def update(self, **kwargs):
+ cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+ arrow_fields = {
+ "use_deprecated_int96_timestamps",
+ "coerce_timestamps",
+ "allow_truncated_timestamps",
+ }
+
+ update = False
+ update_arrow = False
+ for name, value in kwargs.items():
+ assert name in self._properties
Review comment:
```suggestion
if name not in self._properties:
raise TypeError("unexpected Parquet write option:
{}".format(name))
```
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2105,68 +2288,41 @@ def _get_partition_keys(Expression
partition_expression):
def _filesystemdataset_write(
- data, object base_dir, Schema schema not None,
- FileFormat format not None, FileSystem filesystem not None,
- Partitioning partitioning not None, bint use_threads=True,
+ data not None, object base_dir not None, str basename_template not None,
+ Schema schema not None, FileFormat format not None,
+ FileSystem filesystem not None, Partitioning partitioning not None,
+ FileWriteOptions file_options not None, bint use_threads,
):
"""
CFileSystemDataset.Write wrapper
"""
cdef:
- c_string c_base_dir
- shared_ptr[CSchema] c_schema
- shared_ptr[CFileFormat] c_format
- shared_ptr[CFileSystem] c_filesystem
- shared_ptr[CPartitioning] c_partitioning
- shared_ptr[CScanContext] c_context
- # to create iterator of InMemory fragments
+ CFileSystemDatasetWriteOptions c_options
+ shared_ptr[CScanner] c_scanner
vector[shared_ptr[CRecordBatch]] c_batches
- shared_ptr[CFragment] c_fragment
- vector[shared_ptr[CFragment]] c_fragment_vector
- c_base_dir = tobytes(_stringify_path(base_dir))
- c_schema = pyarrow_unwrap_schema(schema)
- c_format = format.unwrap()
- c_filesystem = filesystem.unwrap()
- c_partitioning = partitioning.unwrap()
- c_context = _build_scan_context(use_threads=use_threads)
+ c_options.file_write_options = file_options.unwrap()
+ c_options.filesystem = filesystem.unwrap()
+ c_options.base_dir = tobytes(_stringify_path(base_dir))
+ c_options.partitioning = partitioning.unwrap()
+ c_options.basename_template = tobytes(basename_template)
Review comment:
The `FileFormat` is not part of the c_options? As that argument now
seems to be ignored?
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2105,68 +2288,41 @@ def _get_partition_keys(Expression
partition_expression):
def _filesystemdataset_write(
- data, object base_dir, Schema schema not None,
- FileFormat format not None, FileSystem filesystem not None,
- Partitioning partitioning not None, bint use_threads=True,
+ data not None, object base_dir not None, str basename_template not None,
+ Schema schema not None, FileFormat format not None,
+ FileSystem filesystem not None, Partitioning partitioning not None,
+ FileWriteOptions file_options not None, bint use_threads,
):
"""
CFileSystemDataset.Write wrapper
"""
cdef:
- c_string c_base_dir
- shared_ptr[CSchema] c_schema
- shared_ptr[CFileFormat] c_format
- shared_ptr[CFileSystem] c_filesystem
- shared_ptr[CPartitioning] c_partitioning
- shared_ptr[CScanContext] c_context
- # to create iterator of InMemory fragments
+ CFileSystemDatasetWriteOptions c_options
+ shared_ptr[CScanner] c_scanner
vector[shared_ptr[CRecordBatch]] c_batches
- shared_ptr[CFragment] c_fragment
- vector[shared_ptr[CFragment]] c_fragment_vector
- c_base_dir = tobytes(_stringify_path(base_dir))
- c_schema = pyarrow_unwrap_schema(schema)
- c_format = format.unwrap()
- c_filesystem = filesystem.unwrap()
- c_partitioning = partitioning.unwrap()
- c_context = _build_scan_context(use_threads=use_threads)
+ c_options.file_write_options = file_options.unwrap()
+ c_options.filesystem = filesystem.unwrap()
+ c_options.base_dir = tobytes(_stringify_path(base_dir))
+ c_options.partitioning = partitioning.unwrap()
+ c_options.basename_template = tobytes(basename_template)
Review comment:
OK, I suppose the format is embedded in the FileWriteOptions. But then
FileFormat should not be passed here, I think? And we should also check that
FileFormat and FileWriteOptions don't conflict if both are passed?
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -2105,68 +2288,41 @@ def _get_partition_keys(Expression
partition_expression):
def _filesystemdataset_write(
- data, object base_dir, Schema schema not None,
- FileFormat format not None, FileSystem filesystem not None,
- Partitioning partitioning not None, bint use_threads=True,
+ data not None, object base_dir not None, str basename_template not None,
+ Schema schema not None, FileFormat format not None,
+ FileSystem filesystem not None, Partitioning partitioning not None,
+ FileWriteOptions file_options not None, bint use_threads,
):
"""
CFileSystemDataset.Write wrapper
"""
cdef:
- c_string c_base_dir
- shared_ptr[CSchema] c_schema
- shared_ptr[CFileFormat] c_format
- shared_ptr[CFileSystem] c_filesystem
- shared_ptr[CPartitioning] c_partitioning
- shared_ptr[CScanContext] c_context
- # to create iterator of InMemory fragments
+ CFileSystemDatasetWriteOptions c_options
+ shared_ptr[CScanner] c_scanner
vector[shared_ptr[CRecordBatch]] c_batches
- shared_ptr[CFragment] c_fragment
- vector[shared_ptr[CFragment]] c_fragment_vector
- c_base_dir = tobytes(_stringify_path(base_dir))
- c_schema = pyarrow_unwrap_schema(schema)
- c_format = format.unwrap()
- c_filesystem = filesystem.unwrap()
- c_partitioning = partitioning.unwrap()
- c_context = _build_scan_context(use_threads=use_threads)
+ c_options.file_write_options = file_options.unwrap()
+ c_options.filesystem = filesystem.unwrap()
+ c_options.base_dir = tobytes(_stringify_path(base_dir))
+ c_options.partitioning = partitioning.unwrap()
+ c_options.basename_template = tobytes(basename_template)
if isinstance(data, Dataset):
- with nogil:
- check_status(
- CFileSystemDataset.Write(
- c_schema,
- c_format,
- c_filesystem,
- c_base_dir,
- c_partitioning,
- c_context,
- (<Dataset> data).dataset.GetFragments()
- )
- )
+ scanner = data._scanner(use_threads=use_threads)
else:
- # data is list of batches/tables, one element per fragment
+ # data is list of batches/tables
for table in data:
if isinstance(table, Table):
for batch in table.to_batches():
c_batches.push_back((<RecordBatch> batch).sp_batch)
else:
c_batches.push_back((<RecordBatch> table).sp_batch)
- c_fragment = shared_ptr[CFragment](
- new CInMemoryFragment(c_batches, _true.unwrap()))
- c_batches.clear()
- c_fragment_vector.push_back(c_fragment)
+ data = Fragment.wrap(shared_ptr[CFragment](
+ new CInMemoryFragment(move(c_batches), _true.unwrap())))
Review comment:
Does this change behaviour? It seems you are now creating a single
fragment instead of a vector of fragments?
##########
File path: python/pyarrow/dataset.py
##########
@@ -741,6 +746,12 @@ def write_dataset(data, base_dir, format=None,
partitioning=None, schema=None,
)
format = _ensure_format(format)
+ if basename_template is None:
+ basename_template = "dat_{i}." + format.default_extname
Review comment:
Maybe we could go with `"part-{i}."` for the default name (that seems
more in line with eg dask / spark)
(I can also do this in a follow-up PR, if we want this)
##########
File path: python/pyarrow/_dataset.pyx
##########
@@ -1054,18 +1095,173 @@ cdef class ParquetReadOptions(_Weakrefable):
return False
+cdef class ParquetFileWriteOptions(FileWriteOptions):
+
+ cdef:
+ CParquetFileWriteOptions* parquet_options
+ object _properties
+
+ def update(self, **kwargs):
+ cdef CParquetFileWriteOptions* opts = self.parquet_options
+
+ arrow_fields = {
+ "use_deprecated_int96_timestamps",
+ "coerce_timestamps",
+ "allow_truncated_timestamps",
+ }
+
+ update = False
+ update_arrow = False
+ for name, value in kwargs.items():
+ assert name in self._properties
+ self._properties[name] = value
+ if name in arrow_fields:
+ update_arrow = True
+ else:
+ update = True
+
+ if update:
+ opts.writer_properties = _create_writer_properties(
+ use_dictionary=self.use_dictionary,
+ compression=self.compression,
+ version=self.version,
+ write_statistics=self.write_statistics,
+ data_page_size=self.data_page_size,
+ compression_level=self.compression_level,
+ use_byte_stream_split=self.use_byte_stream_split,
+ data_page_version=self.data_page_version,
+ )
+
+ if update_arrow:
+ opts.arrow_writer_properties = _create_arrow_writer_properties(
+ use_deprecated_int96_timestamps=(
+ self.use_deprecated_int96_timestamps
+ ),
+ coerce_timestamps=self.coerce_timestamps,
+ allow_truncated_timestamps=self.allow_truncated_timestamps,
+ writer_engine_version=self.writer_engine_version,
+ )
+
+ @property
+ def use_dictionary(self):
+ return self._properties['use_dictionary']
+
+ @use_dictionary.setter
+ def use_dictionary(self, use_dictionary):
+ self.update(use_dictionary=use_dictionary)
Review comment:
Ah, I see now they are actually used in the `update` to get the values
out of the `_properties`. But that's only the property, not the setter, that is
used? (and could in principle also use `_properties` directly there)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]