pitrou commented on a change in pull request #9504: URL: https://github.com/apache/arrow/pull/9504#discussion_r581283432
########## File path: cpp/cmake_modules/SetupCxxFlags.cmake ########## @@ -629,3 +629,8 @@ if(MSVC) set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} ${MSVC_LINKER_FLAGS}") endif() endif() + +#-------------------------------------------------------------------------------------- +if(ARROW_COMPUTE) + add_definitions(-DARROW_HAVE_COMPUTE_MODULE) Review comment: The preferred way would to be let CMake generate it: https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/config.h.cmake#L37 (also, if you just pass the option here, it might not be taken up by third-party applications including Arrow) ########## File path: python/pyarrow/includes/libarrow.pxd ########## @@ -1634,6 +1641,15 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: CCSVReadOptions, CCSVParseOptions, CCSVConvertOptions) +# Writer is included explicity to avoid having to set additional +# C-Processor definitions in setup.py for cmake. Review comment: You shouldn't need this with the `config.h.cmake` approach. ########## File path: python/pyarrow/_csv.pyx ########## @@ -763,3 +765,86 @@ def open_csv(input_file, read_options=None, parse_options=None, move(c_convert_options), maybe_unbox_memory_pool(memory_pool)) return reader + + +cdef class WriteOptions(_Weakrefable): + """ + Options for writing CSV files. + + Parameters + ---------- + include_header : bool, optional (default True) + Whether to include the header + batch_size : int, optional (default 1024) + How many rows to process together when converting and writing + CSV + """ + cdef: + CCSVWriteOptions options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, *, include_header=None, batch_size=None): + self.options = CCSVWriteOptions.Defaults() + if include_header is not None: + self.options.include_header = include_header + if batch_size is not None: + self.options.batch_size = 1024 + + +cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): + if write_options is None: + out[0] = CCSVWriteOptions.Defaults() + else: + out[0] = write_options.options + + +def write_csv(data, output_file, write_options=None, + MemoryPool memory_pool=None): + """ + Review comment: This docstring misses a description. ########## File path: cpp/src/arrow/csv/writer_test.cc ########## @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "gtest/gtest.h" + +#include <memory> +#include <vector> + +#include "arrow/buffer.h" +#include "arrow/csv/writer.h" +#include "arrow/io/memory.h" +#include "arrow/record_batch.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" + +namespace arrow { +namespace csv { + +struct TestParams { + std::shared_ptr<RecordBatch> record_batch; + WriteOptions options; + std::string expected_output; +}; + +WriteOptions DefaultTestOptions(bool include_header) { + WriteOptions options; + options.batch_size = 5; + options.include_header = include_header; + return options; +} + +std::vector<TestParams> GenerateTestCases() { Review comment: Ok, thanks. ########## File path: python/pyarrow/_csv.pyx ########## @@ -763,3 +765,86 @@ def open_csv(input_file, read_options=None, parse_options=None, move(c_convert_options), maybe_unbox_memory_pool(memory_pool)) return reader + + +cdef class WriteOptions(_Weakrefable): + """ + Options for writing CSV files. + + Parameters + ---------- + include_header : bool, optional (default True) + Whether to include the header + batch_size : int, optional (default 1024) + How many rows to process together when converting and writing + CSV + """ + cdef: + CCSVWriteOptions options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, *, include_header=None, batch_size=None): + self.options = CCSVWriteOptions.Defaults() + if include_header is not None: + self.options.include_header = include_header + if batch_size is not None: + self.options.batch_size = 1024 + + +cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): + if write_options is None: + out[0] = CCSVWriteOptions.Defaults() + else: + out[0] = write_options.options + + +def write_csv(data, output_file, write_options=None, + MemoryPool memory_pool=None): + """ + + Parameters + ---------- + data: The data to write. + Either a pyarrow.RecordBatch or a pyarrow.Table + output_file: string, path, pyarrow.OutputStream or file-like object + The location of CSV data. + write_options: pyarrow.csv.WriteOptions + Options to configure writing the CSV file. + memory_pool: MemoryPool, optional + Pool for temporary allocations. + + Returns + ------- + None + """ + cdef: + shared_ptr[COutputStream] stream + CCSVWriteOptions c_write_options + CMemoryPool* c_memory_pool + CRecordBatch* batch + CTable* table + _get_write_options(write_options, &c_write_options) + + try: + where = _stringify_path(output_file) + except TypeError: + get_writer(output_file, &stream) + else: + c_where = tobytes(where) + stream = GetResultValue(FileOutputStream.Open(c_where)) + + c_memory_pool = maybe_unbox_memory_pool(memory_pool) + if isinstance(data, RecordBatch): + batch = (<RecordBatch>data).batch Review comment: `pyarrow_unwrap_batch` is generally preferred. ########## File path: python/pyarrow/_csv.pyx ########## @@ -763,3 +765,86 @@ def open_csv(input_file, read_options=None, parse_options=None, move(c_convert_options), maybe_unbox_memory_pool(memory_pool)) return reader + + +cdef class WriteOptions(_Weakrefable): + """ + Options for writing CSV files. + + Parameters + ---------- + include_header : bool, optional (default True) + Whether to include the header + batch_size : int, optional (default 1024) + How many rows to process together when converting and writing + CSV + """ + cdef: + CCSVWriteOptions options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, *, include_header=None, batch_size=None): + self.options = CCSVWriteOptions.Defaults() + if include_header is not None: + self.options.include_header = include_header + if batch_size is not None: + self.options.batch_size = 1024 + + +cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): + if write_options is None: + out[0] = CCSVWriteOptions.Defaults() + else: + out[0] = write_options.options + + +def write_csv(data, output_file, write_options=None, + MemoryPool memory_pool=None): + """ + + Parameters + ---------- + data: The data to write. + Either a pyarrow.RecordBatch or a pyarrow.Table + output_file: string, path, pyarrow.OutputStream or file-like object + The location of CSV data. + write_options: pyarrow.csv.WriteOptions + Options to configure writing the CSV file. + memory_pool: MemoryPool, optional + Pool for temporary allocations. + + Returns + ------- + None + """ + cdef: + shared_ptr[COutputStream] stream + CCSVWriteOptions c_write_options + CMemoryPool* c_memory_pool + CRecordBatch* batch + CTable* table + _get_write_options(write_options, &c_write_options) + + try: + where = _stringify_path(output_file) + except TypeError: + get_writer(output_file, &stream) + else: + c_where = tobytes(where) + stream = GetResultValue(FileOutputStream.Open(c_where)) + + c_memory_pool = maybe_unbox_memory_pool(memory_pool) + if isinstance(data, RecordBatch): + batch = (<RecordBatch>data).batch + with nogil: + check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool, + stream.get())) + elif isinstance(data, Table): + table = (<Table>data).table + with nogil: + check_status(WriteCSV(deref(table), c_write_options, c_memory_pool, + stream.get())) + else: + raise ValueError(type(data)) Review comment: We should raise a proper `TypeError`, e.g.: ```python raise TypeError(f"Expected Table or RecordBatch, got '{type(data)}'") ``` ########## File path: python/pyarrow/_csv.pyx ########## @@ -763,3 +765,86 @@ def open_csv(input_file, read_options=None, parse_options=None, move(c_convert_options), maybe_unbox_memory_pool(memory_pool)) return reader + + +cdef class WriteOptions(_Weakrefable): + """ + Options for writing CSV files. + + Parameters + ---------- + include_header : bool, optional (default True) + Whether to include the header + batch_size : int, optional (default 1024) + How many rows to process together when converting and writing + CSV + """ + cdef: + CCSVWriteOptions options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, *, include_header=None, batch_size=None): + self.options = CCSVWriteOptions.Defaults() + if include_header is not None: + self.options.include_header = include_header + if batch_size is not None: + self.options.batch_size = 1024 + + +cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): + if write_options is None: + out[0] = CCSVWriteOptions.Defaults() + else: + out[0] = write_options.options + + +def write_csv(data, output_file, write_options=None, + MemoryPool memory_pool=None): + """ + + Parameters + ---------- + data: The data to write. + Either a pyarrow.RecordBatch or a pyarrow.Table + output_file: string, path, pyarrow.OutputStream or file-like object + The location of CSV data. + write_options: pyarrow.csv.WriteOptions + Options to configure writing the CSV file. + memory_pool: MemoryPool, optional + Pool for temporary allocations. + + Returns + ------- + None Review comment: Looking at the existing PyArrow code, we generally don't mention anything when the function doesn't return a useful value. ########## File path: python/pyarrow/_csv.pyx ########## @@ -763,3 +765,86 @@ def open_csv(input_file, read_options=None, parse_options=None, move(c_convert_options), maybe_unbox_memory_pool(memory_pool)) return reader + + +cdef class WriteOptions(_Weakrefable): + """ + Options for writing CSV files. + + Parameters + ---------- + include_header : bool, optional (default True) + Whether to include the header + batch_size : int, optional (default 1024) + How many rows to process together when converting and writing + CSV + """ + cdef: + CCSVWriteOptions options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, *, include_header=None, batch_size=None): + self.options = CCSVWriteOptions.Defaults() + if include_header is not None: + self.options.include_header = include_header + if batch_size is not None: + self.options.batch_size = 1024 + + +cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): + if write_options is None: + out[0] = CCSVWriteOptions.Defaults() + else: + out[0] = write_options.options + + +def write_csv(data, output_file, write_options=None, + MemoryPool memory_pool=None): + """ + + Parameters + ---------- + data: The data to write. + Either a pyarrow.RecordBatch or a pyarrow.Table + output_file: string, path, pyarrow.OutputStream or file-like object + The location of CSV data. + write_options: pyarrow.csv.WriteOptions + Options to configure writing the CSV file. + memory_pool: MemoryPool, optional + Pool for temporary allocations. + + Returns + ------- + None + """ + cdef: + shared_ptr[COutputStream] stream + CCSVWriteOptions c_write_options + CMemoryPool* c_memory_pool + CRecordBatch* batch + CTable* table + _get_write_options(write_options, &c_write_options) + + try: + where = _stringify_path(output_file) + except TypeError: + get_writer(output_file, &stream) + else: + c_where = tobytes(where) + stream = GetResultValue(FileOutputStream.Open(c_where)) + + c_memory_pool = maybe_unbox_memory_pool(memory_pool) + if isinstance(data, RecordBatch): + batch = (<RecordBatch>data).batch + with nogil: + check_status(WriteCSV(deref(batch), c_write_options, c_memory_pool, + stream.get())) + elif isinstance(data, Table): + table = (<Table>data).table Review comment: `pyarrow_unwrap_table` also ########## File path: python/pyarrow/_csv.pyx ########## @@ -763,3 +765,86 @@ def open_csv(input_file, read_options=None, parse_options=None, move(c_convert_options), maybe_unbox_memory_pool(memory_pool)) return reader + + +cdef class WriteOptions(_Weakrefable): + """ + Options for writing CSV files. + + Parameters + ---------- + include_header : bool, optional (default True) + Whether to include the header + batch_size : int, optional (default 1024) + How many rows to process together when converting and writing + CSV + """ + cdef: + CCSVWriteOptions options + + # Avoid mistakingly creating attributes + __slots__ = () + + def __init__(self, *, include_header=None, batch_size=None): + self.options = CCSVWriteOptions.Defaults() + if include_header is not None: + self.options.include_header = include_header + if batch_size is not None: + self.options.batch_size = 1024 + + +cdef _get_write_options(WriteOptions write_options, CCSVWriteOptions* out): + if write_options is None: + out[0] = CCSVWriteOptions.Defaults() + else: + out[0] = write_options.options + + +def write_csv(data, output_file, write_options=None, + MemoryPool memory_pool=None): + """ + + Parameters + ---------- + data: The data to write. + Either a pyarrow.RecordBatch or a pyarrow.Table + output_file: string, path, pyarrow.OutputStream or file-like object + The location of CSV data. + write_options: pyarrow.csv.WriteOptions + Options to configure writing the CSV file. + memory_pool: MemoryPool, optional + Pool for temporary allocations. + + Returns + ------- + None + """ + cdef: + shared_ptr[COutputStream] stream + CCSVWriteOptions c_write_options + CMemoryPool* c_memory_pool + CRecordBatch* batch + CTable* table + _get_write_options(write_options, &c_write_options) + + try: + where = _stringify_path(output_file) + except TypeError: + get_writer(output_file, &stream) + else: + c_where = tobytes(where) + stream = GetResultValue(FileOutputStream.Open(c_where)) Review comment: It seems `get_writer` should do this for string and path-like inputs already. Is there a reason you had to write this `try/except/else` switch? ########## File path: cpp/src/arrow/csv/writer.h ########## @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/csv/options.h" +#include "arrow/io/interfaces.h" +#include "arrow/record_batch.h" +#include "arrow/table.h" + +namespace arrow { +namespace csv { +// Functionality for converting Arrow data to Comma separated value text. +// This library supports all primitive types that can be cast to a StringArrays. +// It applies to following formatting rules: +// - For non-binary types no quotes surround values. Nulls are represented as the empty +// string. +// - For binary types all non-null data is quoted (and quotes within data are escaped +// with an additional quote). +// Null values are empty and unquoted. +// - LF (\n) is always used as a line ending. + +/// \brief Converts table to a CSV and writes the results to output. +/// Experimental +ARROW_EXPORT Status WriteCsv(const Table& table, const WriteOptions& options, + MemoryPool* pool, arrow::io::OutputStream* output); +/// \brief Converts batch to CSV and writes the results to output. +/// Experimental +ARROW_EXPORT Status WriteCsv(const RecordBatch& batch, const WriteOptions& options, + MemoryPool* pool, arrow::io::OutputStream* output); Review comment: Writing iteratively requires to be careful with the options (you don't want to append a new header for each subsequent batch). As you prefer, though. We can convert this later. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org