wjones127 commented on code in PR #15240:
URL: https://github.com/apache/arrow/pull/15240#discussion_r1064923528


##########
cpp/src/parquet/file_writer.h:
##########
@@ -91,6 +93,10 @@ class PARQUET_EXPORT RowGroupWriter {
   int64_t total_bytes_written() const;
   int64_t total_compressed_bytes() const;
 
+  /// Returns whether the current RowGroupWriter is in the buffered mode and 
is created
+  /// by calling ParquetFileWriter::AppendBufferedRowGroup.

Review Comment:
   ```suggestion
     /// by calling ParquetFileWriter::NewBufferedRowGroup.
   ```



##########
cpp/src/parquet/arrow/writer.cc:
##########
@@ -355,6 +381,40 @@ class FileWriterImpl : public FileWriter {
     return Status::OK();
   }
 
+  Status NewBufferedRowGroup() override {
+    if (row_group_writer_ != nullptr) {
+      PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+    }
+    PARQUET_CATCH_NOT_OK(row_group_writer_ = 
writer_->AppendBufferedRowGroup());
+    return Status::OK();
+  }
+
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    if (batch.num_rows() == 0) {
+      return Status::OK();
+    }
+
+    if (row_group_writer_ == nullptr || !row_group_writer_->buffered()) {
+      RETURN_NOT_OK(NewBufferedRowGroup());
+    }
+
+    auto WriteBatch = [&](int64_t offset, int64_t size) {
+      int column_index_start = 0;
+      for (int i = 0; i < batch.num_columns(); i++) {
+        ChunkedArray chunkedArray(batch.column(i));
+        ARROW_ASSIGN_OR_RAISE(
+            std::unique_ptr<ArrowColumnWriterV2> writer,
+            ArrowColumnWriterV2::Make(chunkedArray, offset, size, 
schema_manifest_,
+                                      row_group_writer_, column_index_start));
+        RETURN_NOT_OK(writer->Write(&column_write_context_));
+        column_index_start += writer->leaf_count();

Review Comment:
   I think this seems correct, but it would be good to test this path with data 
containing a struct array.



##########
cpp/src/parquet/arrow/writer.h:
##########
@@ -114,6 +115,14 @@ class PARQUET_EXPORT FileWriter {
   virtual ::arrow::Status WriteColumnChunk(
       const std::shared_ptr<::arrow::ChunkedArray>& data) = 0;
 
+  /// \brief Start a new buffered row group.
+  ///
+  /// Returns an error if not all columns have been written.
+  virtual ::arrow::Status NewBufferedRowGroup() = 0;
+
+  /// \brief Write a RecordBatch into the buffered row group.

Review Comment:
   Should clarify:
   
    * That multiple batches can be written to the same row group.
    * What happens when you write more data than 
`WriterProperties.max_row_group_length`?
    * When do the batches get flushed to the output stream?
   
   ```suggestion
     /// \brief Write a RecordBatch into the buffered row group.
     ///
     /// Multiple RecordBatches can be written into the same row
     /// group through this method. 
   ```



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -4727,5 +4727,111 @@ std::vector<NestedFilterTestCase> 
GenerateMapFilteredTestCases() {
 INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader,
                          ::testing::ValuesIn(GenerateMapFilteredTestCases()));
 
+template <typename TestType>
+class TestBufferedParquetIO : public TestParquetIO<TestType> {
+ public:
+  void WriteBufferedFile(const std::shared_ptr<Array>& values, int64_t 
batch_size) {
+    std::shared_ptr<GroupNode> schema =
+        MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
+    SchemaDescriptor descriptor;
+    ASSERT_NO_THROW(descriptor.Init(schema));
+    std::shared_ptr<::arrow::Schema> arrow_schema;
+    ArrowReaderProperties props;
+    ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
+
+    std::unique_ptr<FileWriter> writer;
+    ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+                                        this->MakeWriter(schema), arrow_schema,
+                                        default_arrow_writer_properties(), 
&writer));
+    for (int i = 0; i < 4; i++) {
+      if (i % 2 == 0) {
+        ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup());
+      }
+      std::shared_ptr<Array> sliced_array = values->Slice(i * batch_size, 
batch_size);
+      std::vector<std::shared_ptr<Array>> arrays = {sliced_array};
+      auto batch = ::arrow::RecordBatch::Make(arrow_schema, batch_size, 
arrays);
+      ASSERT_OK_NO_THROW(writer->WriteRecordBatch(*batch));
+    }
+    ASSERT_OK_NO_THROW(writer->Close());
+  }
+};
+
+TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes);
+
+TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) {
+  constexpr int64_t batch_size = SMALL_SIZE / 4;
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  this->WriteBufferedFile(values, batch_size);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));

Review Comment:
   Also, do we test what happens when we write a record batch that is larger 
than `WriterProperties.max_row_group_length`?



##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -4727,5 +4727,111 @@ std::vector<NestedFilterTestCase> 
GenerateMapFilteredTestCases() {
 INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader,
                          ::testing::ValuesIn(GenerateMapFilteredTestCases()));
 
+template <typename TestType>
+class TestBufferedParquetIO : public TestParquetIO<TestType> {
+ public:
+  void WriteBufferedFile(const std::shared_ptr<Array>& values, int64_t 
batch_size) {
+    std::shared_ptr<GroupNode> schema =
+        MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
+    SchemaDescriptor descriptor;
+    ASSERT_NO_THROW(descriptor.Init(schema));
+    std::shared_ptr<::arrow::Schema> arrow_schema;
+    ArrowReaderProperties props;
+    ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
+
+    std::unique_ptr<FileWriter> writer;
+    ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+                                        this->MakeWriter(schema), arrow_schema,
+                                        default_arrow_writer_properties(), 
&writer));
+    for (int i = 0; i < 4; i++) {
+      if (i % 2 == 0) {
+        ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup());
+      }
+      std::shared_ptr<Array> sliced_array = values->Slice(i * batch_size, 
batch_size);
+      std::vector<std::shared_ptr<Array>> arrays = {sliced_array};
+      auto batch = ::arrow::RecordBatch::Make(arrow_schema, batch_size, 
arrays);
+      ASSERT_OK_NO_THROW(writer->WriteRecordBatch(*batch));
+    }
+    ASSERT_OK_NO_THROW(writer->Close());
+  }
+};
+
+TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes);
+
+TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) {
+  constexpr int64_t batch_size = SMALL_SIZE / 4;
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+  this->WriteBufferedFile(values, batch_size);
+  ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values));

Review Comment:
   IIUC, the purpose of using `NewBufferedRowGroup()` between calls to 
`WriteRecordBatch()` is to write multiple batches to the same row group, right? 
Could you add an assertion here that the final number of row groups output is 
correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to