wgtmac commented on code in PR #48718:
URL: https://github.com/apache/arrow/pull/48718#discussion_r2684718416
##########
cpp/src/arrow/csv/writer.cc:
##########
@@ -590,6 +590,13 @@ class CSVWriterImpl : public ipc::RecordBatchWriter {
return Status::OK();
}
+ // GH-36889: Write buffer to sink and clear it to avoid stale content
+ // being written again if the next batch is empty.
+ Status WriteAndClearBuffer() {
Review Comment:
nit: perhaps renaming to `FlushToSink` which is shorter and has the
implication that the internal buffer is flushed and cleared.
##########
cpp/src/arrow/csv/writer_test.cc:
##########
@@ -405,5 +406,46 @@ INSTANTIATE_TEST_SUITE_P(
"\n2016-02-29 10:42:23-0700,2016-02-29
17:42:23Z\n")));
#endif
+// GH-36889: Empty batches at the start should not cause duplicate headers
+TEST(TestWriteCSV, EmptyBatchAtStart) {
+ auto schema = arrow::schema({field("col1", utf8())});
+ auto empty_batch = RecordBatchFromJSON(schema, "[]");
+ auto data_batch = RecordBatchFromJSON(schema, R"([{"col1": "a"}, {"col1":
"b"}])");
+
+ // Concatenate empty table with data table
+ ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema,
{empty_batch}));
+ ASSERT_OK_AND_ASSIGN(auto data_table, Table::FromRecordBatches(schema,
{data_batch}));
+ ASSERT_OK_AND_ASSIGN(auto combined_table, ConcatenateTables({empty_table,
data_table}));
+
+ ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
+ ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get()));
+ ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
+
+ std::string result(reinterpret_cast<const char*>(buffer->data()),
buffer->size());
+ // Should have exactly one header, not two
+ EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n");
+}
+
+// GH-36889: Empty batches in the middle should not cause issues
+TEST(TestWriteCSV, EmptyBatchInMiddle) {
+ auto schema = arrow::schema({field("col1", utf8())});
+ auto batch1 = RecordBatchFromJSON(schema, R"([{"col1": "a"}])");
+ auto empty_batch = RecordBatchFromJSON(schema, "[]");
+ auto batch2 = RecordBatchFromJSON(schema, R"([{"col1": "b"}])");
+
+ ASSERT_OK_AND_ASSIGN(auto table1, Table::FromRecordBatches(schema,
{batch1}));
+ ASSERT_OK_AND_ASSIGN(auto empty_table, Table::FromRecordBatches(schema,
{empty_batch}));
+ ASSERT_OK_AND_ASSIGN(auto table2, Table::FromRecordBatches(schema,
{batch2}));
+ ASSERT_OK_AND_ASSIGN(auto combined_table,
+ ConcatenateTables({table1, empty_table, table2}));
+
+ ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
+ ASSERT_OK(WriteCSV(*combined_table, WriteOptions::Defaults(), out.get()));
+ ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
+
+ std::string result(reinterpret_cast<const char*>(buffer->data()),
buffer->size());
+ EXPECT_EQ(result, "\"col1\"\n\"a\"\n\"b\"\n");
+}
Review Comment:
```suggestion
TEST(TestWriteCSV, EmptyBatchShouldNotPolluteOutput) {
auto schema = arrow::schema({field("col1", utf8())});
auto empty_batch = RecordBatchFromJSON(schema, "[]");
auto batch_a = RecordBatchFromJSON(schema, R"([{"col1": "a"}])");
auto batch_b = RecordBatchFromJSON(schema, R"([{"col1": "b"}])");
struct TestParam {
std::shared_ptr<Table> table;
std::string expected_output;
};
std::vector<TestParam> test_params = {
// Empty batch in the beginning
{Table::FromRecordBatches(schema, {empty_batch, batch_a,
batch_b}).ValueOrDie(),
"\"col1\"\n\"a\"\n\"b\"\n"},
// Empty batch in the middle
{Table::FromRecordBatches(schema, {batch_a, empty_batch,
batch_b}).ValueOrDie(),
"\"col1\"\n\"a\"\n\"b\"\n"},
// Empty batch in the end
{Table::FromRecordBatches(schema, {batch_a, batch_b,
empty_batch}).ValueOrDie(),
"\"col1\"\n\"a\"\n\"b\"\n"},
};
for (const auto& param : test_params) {
ASSERT_OK_AND_ASSIGN(auto out, io::BufferOutputStream::Create());
ASSERT_OK(WriteCSV(*param.table, WriteOptions::Defaults(), out.get()));
ASSERT_OK_AND_ASSIGN(auto buffer, out->Finish());
EXPECT_EQ(buffer->ToString(), param.expected_output);
}
}
```
These two cases are so similar so I'd recommend to combine them with
different params like the above.
##########
python/pyarrow/tests/test_csv.py:
##########
@@ -2065,3 +2065,36 @@ def readinto(self, *args):
for i in range(20):
with pytest.raises(pa.ArrowInvalid):
read_csv(MyBytesIO(data))
+
+
+def test_write_csv_empty_batch_no_duplicate_header():
Review Comment:
Not an expert in pyarrow but I'd recommend to consolidate similar cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]