This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d0e068da0 GH-36889: [C++][Python] Fix duplicate CSV header when first 
batch is empty (#48718)
0d0e068da0 is described below

commit 0d0e068da0904918e646f301fa75e75f66a6827b
Author: Ruiyang Wang <[email protected]>
AuthorDate: Tue Jan 27 07:18:16 2026 -0800

    GH-36889: [C++][Python] Fix duplicate CSV header when first batch is empty 
(#48718)
    
    ### Rationale for this change
    
    Fixes https://github.com/apache/arrow/issues/36889
    
    When writing CSV from a table where the first batch is empty, the header 
gets written twice:
    
    ```python
    table = pa.table({"col1": ["a", "b", "c"]})
    combined = pa.concat_tables([table.schema.empty_table(), table])
    write_csv(combined, buf)
    # Result: "col1"\n"col1"\n"a"\n"b"\n"c"\n  <-- header appears twice
    ```
    
    ### What changes are included in this PR?
    
    The bug happens because:
    1. Header is written to `data_buffer_` and flushed during `CSVWriterImpl` 
initialization
    2. The buffer is not cleared after flush
    3. When the next batch is empty, `TranslateMinimalBatch` returns early 
without modifying `data_buffer_`
    4. The write loop then writes `data_buffer_` which still contains stale 
content
    
    The fix introduces a `WriteAndClearBuffer()` helper that writes the buffer 
to sink and clears it. This helper is used in all write paths:
    - `WriteHeader()`
    - `WriteRecordBatch()`
    - `WriteTable()`
    
    This ensures the buffer is always clean after any flush, making it 
impossible for stale content to be written again.
    
    ### Are these changes tested?
    
    Yes. Added C++ tests in `writer_test.cc` and Python tests in `test_csv.py`:
    - Empty batch at start of table
    - Empty batch in middle of table
    
    ### Are there any user-facing changes?
    
    No API changes. This is a bug fix that prevents duplicate headers when 
writing CSV from tables with empty batches.
    
    * GitHub Issue: #36889
    
    Lead-authored-by: Ruiyang Wang <[email protected]>
    Co-authored-by: Ruiyang Wang <[email protected]>
    Co-authored-by: Gang Wu <[email protected]>
    Signed-off-by: Gang Wu <[email protected]>
---
 cpp/src/arrow/csv/writer.cc      | 13 ++++++++++---
 cpp/src/arrow/csv/writer_test.cc | 32 ++++++++++++++++++++++++++++++++
 python/pyarrow/tests/test_csv.py | 34 ++++++++++++++++++++++++++++++++++
 3 files changed, 76 insertions(+), 3 deletions(-)

diff --git a/cpp/src/arrow/csv/writer.cc b/cpp/src/arrow/csv/writer.cc
index 5d14fe4b9b..2db0dba2de 100644
--- a/cpp/src/arrow/csv/writer.cc
+++ b/cpp/src/arrow/csv/writer.cc
@@ -541,7 +541,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
     for (auto maybe_slice : iterator) {
       ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> slice, maybe_slice);
       RETURN_NOT_OK(TranslateMinimalBatch(*slice));
-      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      RETURN_NOT_OK(FlushToSink());
       stats_.num_record_batches++;
     }
     return Status::OK();
@@ -554,7 +554,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
     RETURN_NOT_OK(reader.ReadNext(&batch));
     while (batch != nullptr) {
       RETURN_NOT_OK(TranslateMinimalBatch(*batch));
-      RETURN_NOT_OK(sink_->Write(data_buffer_));
+      RETURN_NOT_OK(FlushToSink());
       RETURN_NOT_OK(reader.ReadNext(&batch));
       stats_.num_record_batches++;
     }
@@ -590,6 +590,13 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
     return Status::OK();
   }
 
+  // GH-36889: Flush buffer to sink and clear it to avoid stale content
+  // being written again if the next batch is empty.
+  Status FlushToSink() {
+    RETURN_NOT_OK(sink_->Write(data_buffer_));
+    return data_buffer_->Resize(0, /*shrink_to_fit=*/false);
+  }
+
   int64_t CalculateHeaderSize(QuotingStyle quoting_style) const {
     int64_t header_length = 0;
     for (int col = 0; col < schema_->num_fields(); col++) {
@@ -654,7 +661,7 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
     next += options_.eol.size();
     DCHECK_EQ(reinterpret_cast<uint8_t*>(next),
               data_buffer_->data() + data_buffer_->size());
-    return sink_->Write(data_buffer_);
+    return FlushToSink();
   }
 
   Status TranslateMinimalBatch(const RecordBatch& batch) {
diff --git a/cpp/src/arrow/csv/writer_test.cc b/cpp/src/arrow/csv/writer_test.cc
index 783d7631ab..ce4d8ab16d 100644
--- a/cpp/src/arrow/csv/writer_test.cc
+++ b/cpp/src/arrow/csv/writer_test.cc
@@ -28,6 +28,7 @@
 #include "arrow/ipc/writer.h"
 #include "arrow/record_batch.h"
 #include "arrow/result.h"
+#include "arrow/table.h"
 #include "arrow/testing/gtest_util.h"
 #include "arrow/testing/matchers.h"
 #include "arrow/type.h"
@@ -405,5 +406,36 @@ INSTANTIATE_TEST_SUITE_P(
                          "\n2016-02-29 10:42:23-0700,2016-02-29 
17:42:23Z\n")));
 #endif
 
+TEST(TestWriteCSV, EmptyBatchShouldNotPolluteOutput) {
+  auto schema = arrow::schema({field("col1", utf8())});
+  auto empty_batch = RecordBatchFromJSON(schema, "[]");
+  auto batch_a = RecordBatchFromJSON(schema, R"([{"col1": "a"}])");
+  auto batch_b = RecordBatchFromJSON(schema, R"([{"col1": "b"}])");
+
+  struct TestParam {
+    std::shared_ptr<Table> table;
+    std::string expected_output;
+  };
+
+  std::vector<TestParam> test_params = {
+      // Empty batch in the beginning
+      {Table::FromRecordBatches(schema, {empty_batch, batch_a, 
batch_b}).ValueOrDie(),
+       "\"col1\"\n\"a\"\n\"b\"\n"},
+      // Empty batch in the middle
+      {Table::FromRecordBatches(schema, {batch_a, empty_batch, 
batch_b}).ValueOrDie(),
+       "\"col1\"\n\"a\"\n\"b\"\n"},
+      // Empty batch in the end
+      {Table::FromRecordBatches(schema, {batch_a, batch_b, 
empty_batch}).ValueOrDie(),
+       "\"col1\"\n\"a\"\n\"b\"\n"},
+  };
+
+  for (const auto& param : test_params) {
+    ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
+    ASSERT_OK(WriteCSV(*param.table, WriteOptions::Defaults(), out.get()));
+    ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
+    EXPECT_EQ(buffer->ToString(), param.expected_output);
+  }
+}
+
 }  // namespace csv
 }  // namespace arrow
diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py
index f510c6dbe2..dce605c715 100644
--- a/python/pyarrow/tests/test_csv.py
+++ b/python/pyarrow/tests/test_csv.py
@@ -2065,3 +2065,37 @@ def test_read_csv_gil_deadlock():
     for i in range(20):
         with pytest.raises(pa.ArrowInvalid):
             read_csv(MyBytesIO(data))
+
+
[email protected]("tables,expected", [
+    # GH-36889: Empty batch at the beginning
+    (
+        lambda: [pa.table({"col1": []}).cast(pa.schema([("col1", 
pa.string())])),
+                 pa.table({"col1": ["a"]}),
+                 pa.table({"col1": ["b"]})],
+        b'"col1"\n"a"\n"b"\n'
+    ),
+    # GH-36889: Empty batch in the middle
+    (
+        lambda: [pa.table({"col1": ["a"]}),
+                 pa.table({"col1": []}).cast(pa.schema([("col1", 
pa.string())])),
+                 pa.table({"col1": ["b"]})],
+        b'"col1"\n"a"\n"b"\n'
+    ),
+    # GH-36889: Empty batch at the end
+    (
+        lambda: [pa.table({"col1": ["a"]}),
+                 pa.table({"col1": ["b"]}),
+                 pa.table({"col1": []}).cast(pa.schema([("col1", 
pa.string())]))],
+        b'"col1"\n"a"\n"b"\n'
+    ),
+])
+def test_write_csv_empty_batch_should_not_pollute_output(tables, expected):
+    combined = pa.concat_tables(tables())
+
+    buf = io.BytesIO()
+    write_csv(combined, buf)
+    buf.seek(0)
+    result = buf.read()
+
+    assert result == expected

Reply via email to