This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0d0e068da0 GH-36889: [C++][Python] Fix duplicate CSV header when first
batch is empty (#48718)
0d0e068da0 is described below
commit 0d0e068da0904918e646f301fa75e75f66a6827b
Author: Ruiyang Wang <[email protected]>
AuthorDate: Tue Jan 27 07:18:16 2026 -0800
GH-36889: [C++][Python] Fix duplicate CSV header when first batch is empty
(#48718)
### Rationale for this change
Fixes https://github.com/apache/arrow/issues/36889
When writing CSV from a table where the first batch is empty, the header
gets written twice:
```python
table = pa.table({"col1": ["a", "b", "c"]})
combined = pa.concat_tables([table.schema.empty_table(), table])
write_csv(combined, buf)
# Result: "col1"\n"col1"\n"a"\n"b"\n"c"\n <-- header appears twice
```
### What changes are included in this PR?
The bug happens because:
1. Header is written to `data_buffer_` and flushed during `CSVWriterImpl`
initialization
2. The buffer is not cleared after flush
3. When the next batch is empty, `TranslateMinimalBatch` returns early
without modifying `data_buffer_`
4. The write loop then writes `data_buffer_` which still contains stale
content
The fix introduces a `WriteAndClearBuffer()` helper that writes the buffer
to sink and clears it. This helper is used in all write paths:
- `WriteHeader()`
- `WriteRecordBatch()`
- `WriteTable()`
This ensures the buffer is always clean after any flush, making it
impossible for stale content to be written again.
### Are these changes tested?
Yes. Added C++ tests in `writer_test.cc` and Python tests in `test_csv.py`:
- Empty batch at start of table
- Empty batch in middle of table
### Are there any user-facing changes?
No API changes. This is a bug fix that prevents duplicate headers when
writing CSV from tables with empty batches.
* GitHub Issue: #36889
Lead-authored-by: Ruiyang Wang <[email protected]>
Co-authored-by: Ruiyang Wang <[email protected]>
Co-authored-by: Gang Wu <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
---
cpp/src/arrow/csv/writer.cc | 13 ++++++++++---
cpp/src/arrow/csv/writer_test.cc | 32 ++++++++++++++++++++++++++++++++
python/pyarrow/tests/test_csv.py | 34 ++++++++++++++++++++++++++++++++++
3 files changed, 76 insertions(+), 3 deletions(-)
diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc
index 5d14fe4b9b..2db0dba2de 100644
--- a/cpp/src/arrow/csv/writer.cc
+++ b/cpp/src/arrow/csv/writer.cc
@@ -541,7 +541,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
for (auto maybe_slice : iterator) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
RETURN_NOT_OK(TranslateMinimalBatch(*slice));
- RETURN_NOT_OK(sink_->Write(data_buffer_));
+ RETURN_NOT_OK(FlushToSink());
stats_.num_record_batches++;
}
return Status::OK();
@@ -554,7 +554,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
RETURN_NOT_OK(reader.ReadNext(&batch));
while (batch != nullptr) {
RETURN_NOT_OK(TranslateMinimalBatch(*batch));
- RETURN_NOT_OK(sink_->Write(data_buffer_));
+ RETURN_NOT_OK(FlushToSink());
RETURN_NOT_OK(reader.ReadNext(&batch));
stats_.num_record_batches++;
}
@@ -590,6 +590,13 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
return Status::OK();
}
+ // GH-36889: Flush buffer to sink and clear it to avoid stale content
+ // being written again if the next batch is empty.
+ Status FlushToSink() {
+ RETURN_NOT_OK(sink_->Write(data_buffer_));
+ return data_buffer_->Resize(0, /*shrink_to_fit=*/false);
+ }
+
int64_t CalculateHeaderSize(QuotingStyle quoting_style) const {
int64_t header_length = 0;
for (int col = 0; col < schema_->num_fields(); col++) {
@@ -654,7 +661,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
next += options_.eol.size();
DCHECK_EQ(reinterpret_cast<uint8_t*>(next),
data_buffer_->data() + data_buffer_->size());
- return sink_->Write(data_buffer_);
+ return FlushToSink();
}
Status TranslateMinimalBatch(const RecordBatch& batch) {
diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc
index 783d7631ab..ce4d8ab16d 100644
--- a/cpp/src/arrow/csv/writer_test.cc
+++ b/cpp/src/arrow/csv/writer_test.cc
@@ -28,6 +28,7 @@
#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
+#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/matchers.h"
#include "arrow/type.h"
@@ -405,5 +406,36 @@ INSTANTIATE_TEST_SUITE_P(
"\n2016-02-29 10:42:23-0700,2016-02-29
17:42:23Z\n")));
#endif
+TEST(TestWriteCSV, EmptyBatchShouldNotPolluteOutput) {
+ auto schema = arrow::schema({field("col1", utf8())});
+ auto empty_batch = RecordBatchFromJSON(schema, "[]");
+ auto batch_a = RecordBatchFromJSON(schema, R"([{"col1": "a"}])");
+ auto batch_b = RecordBatchFromJSON(schema, R"([{"col1": "b"}])");
+
+ struct TestParam {
+ std::shared_ptr<Table> table;
+ std::string expected_output;
+ };
+
+ std::vector<TestParam> test_params = {
+ // Empty batch in the beginning
+ {Table::FromRecordBatches(schema, {empty_batch, batch_a,
batch_b}).ValueOrDie(),
+ "\"col1\"\n\"a\"\n\"b\"\n"},
+ // Empty batch in the middle
+ {Table::FromRecordBatches(schema, {batch_a, empty_batch,
batch_b}).ValueOrDie(),
+ "\"col1\"\n\"a\"\n\"b\"\n"},
+ // Empty batch in the end
+ {Table::FromRecordBatches(schema, {batch_a, batch_b,
empty_batch}).ValueOrDie(),
+ "\"col1\"\n\"a\"\n\"b\"\n"},
+ };
+
+ for (const auto& param : test_params) {
+ ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
+ ASSERT_OK(WriteCSV(*param.table, WriteOptions::Defaults(), out.get()));
+ ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
+ EXPECT_EQ(buffer->ToString(), param.expected_output);
+ }
+}
+
} // namespace csv
} // namespace arrow
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index f510c6dbe2..dce605c715 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -2065,3 +2065,37 @@ def test_read_csv_gil_deadlock():
for i in range(20):
with pytest.raises(pa.ArrowInvalid):
read_csv(MyBytesIO(data))
+
+
[email protected]("tables,expected", [
+ # GH-36889: Empty batch at the beginning
+ (
+ lambda: [pa.table({"col1": []}).cast(pa.schema([("col1",
pa.string())])),
+ pa.table({"col1": ["a"]}),
+ pa.table({"col1": ["b"]})],
+ b'"col1"\n"a"\n"b"\n'
+ ),
+ # GH-36889: Empty batch in the middle
+ (
+ lambda: [pa.table({"col1": ["a"]}),
+ pa.table({"col1": []}).cast(pa.schema([("col1",
pa.string())])),
+ pa.table({"col1": ["b"]})],
+ b'"col1"\n"a"\n"b"\n'
+ ),
+ # GH-36889: Empty batch at the end
+ (
+ lambda: [pa.table({"col1": ["a"]}),
+ pa.table({"col1": ["b"]}),
+ pa.table({"col1": []}).cast(pa.schema([("col1",
pa.string())]))],
+ b'"col1"\n"a"\n"b"\n'
+ ),
+])
+def test_write_csv_empty_batch_should_not_pollute_output(tables, expected):
+ combined = pa.concat_tables(tables())
+
+ buf = io.BytesIO()
+ write_csv(combined, buf)
+ buf.seek(0)
+ result = buf.read()
+
+ assert result == expected