wjones127 commented on code in PR #15240:
URL: https://github.com/apache/arrow/pull/15240#discussion_r1066148359
##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -4727,5 +4727,153 @@ std::vector<NestedFilterTestCase>
GenerateMapFilteredTestCases() {
INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader,
::testing::ValuesIn(GenerateMapFilteredTestCases()));
+template <typename TestType>
+class TestBufferedParquetIO : public TestParquetIO<TestType> {
+ public:
+ void WriteBufferedFile(const std::shared_ptr<Array>& values, int64_t
batch_size,
+ int* num_row_groups) {
+ std::shared_ptr<GroupNode> schema =
+ MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
+ SchemaDescriptor descriptor;
+ ASSERT_NO_THROW(descriptor.Init(schema));
+ std::shared_ptr<::arrow::Schema> arrow_schema;
+ ArrowReaderProperties props;
+ ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
+
+ std::unique_ptr<FileWriter> writer;
+ ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+ this->MakeWriter(schema), arrow_schema,
+ default_arrow_writer_properties(),
&writer));
+ *num_row_groups = 0;
+ for (int i = 0; i < 4; i++) {
+ if (i % 2 == 0) {
+ ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup());
+ (*num_row_groups)++;
+ }
+ std::shared_ptr<Array> sliced_array = values->Slice(i * batch_size,
batch_size);
+ std::vector<std::shared_ptr<Array>> arrays = {sliced_array};
+ auto batch = ::arrow::RecordBatch::Make(arrow_schema, batch_size,
arrays);
+ ASSERT_OK_NO_THROW(writer->WriteRecordBatch(*batch));
+ }
+ ASSERT_OK_NO_THROW(writer->Close());
+ }
+
+ void ReadAndCheckSingleColumnFile(const Array& values, int num_row_groups) {
+ std::shared_ptr<Array> out;
+
+ std::unique_ptr<FileReader> reader;
+ this->ReaderFromSink(&reader);
+ ASSERT_EQ(num_row_groups, reader->num_row_groups());
+
+ this->ReadSingleColumnFile(std::move(reader), &out);
+ AssertArraysEqual(values, *out);
+ }
+
+ void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values,
+ int num_row_groups) {
+ std::shared_ptr<::arrow::Table> out;
+ std::unique_ptr<FileReader> reader;
+ this->ReaderFromSink(&reader);
+ ASSERT_EQ(num_row_groups, reader->num_row_groups());
+
+ this->ReadTableFromFile(std::move(reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(values->length(), out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0);
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ auto result = chunked_array->chunk(0);
+
+ AssertArraysEqual(*values, *result);
+ }
+};
+
+TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes);
+
+TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) {
+ constexpr int64_t batch_size = SMALL_SIZE / 4;
+ std::shared_ptr<Array> values;
+ ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+ int num_row_groups = 0;
+ this->WriteBufferedFile(values, batch_size, &num_row_groups);
+ ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values,
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) {
+ constexpr int64_t batch_size = LARGE_SIZE / 4;
+ std::shared_ptr<Array> values;
+ ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+ int num_row_groups = 0;
+ this->WriteBufferedFile(values, batch_size, &num_row_groups);
+ ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values,
num_row_groups));
+}
+
+TEST(TestReadWriteArrow, WriteAndReadRecordBatch) {
+ auto pool = ::arrow::default_memory_pool();
+ auto sink = CreateOutputStream();
+ // Limit the max number of rows in a row group to 10
+ auto writer_properties =
WriterProperties::Builder().max_row_group_length(10)->build();
+ auto arrow_writer_properties = default_arrow_writer_properties();
+
+ // Prepare schema
+ auto schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int64()), ::arrow::field("b",
::arrow::utf8()),
+ ::arrow::field("c", ::arrow::struct_({::arrow::field("c1",
::arrow::int64()),
+ ::arrow::field("c2",
::arrow::utf8())}))});
Review Comment:
```suggestion
auto schema = ::arrow::schema(
{::arrow::field("a", ::arrow::int64()),
::arrow::field("b", ::arrow::struct_({::arrow::field("b1",
::arrow::int64()),
::arrow::field("b2",
::arrow::utf8())})),
::arrow::field("c", ::arrow::utf8())});
```
##########
cpp/src/parquet/arrow/arrow_reader_writer_test.cc:
##########
@@ -4727,5 +4727,153 @@ std::vector<NestedFilterTestCase>
GenerateMapFilteredTestCases() {
INSTANTIATE_TEST_SUITE_P(MapFilteredReads, TestNestedSchemaFilteredReader,
::testing::ValuesIn(GenerateMapFilteredTestCases()));
+template <typename TestType>
+class TestBufferedParquetIO : public TestParquetIO<TestType> {
+ public:
+ void WriteBufferedFile(const std::shared_ptr<Array>& values, int64_t
batch_size,
+ int* num_row_groups) {
+ std::shared_ptr<GroupNode> schema =
+ MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
+ SchemaDescriptor descriptor;
+ ASSERT_NO_THROW(descriptor.Init(schema));
+ std::shared_ptr<::arrow::Schema> arrow_schema;
+ ArrowReaderProperties props;
+ ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));
+
+ std::unique_ptr<FileWriter> writer;
+ ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
+ this->MakeWriter(schema), arrow_schema,
+ default_arrow_writer_properties(),
&writer));
+ *num_row_groups = 0;
+ for (int i = 0; i < 4; i++) {
+ if (i % 2 == 0) {
+ ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup());
+ (*num_row_groups)++;
+ }
+ std::shared_ptr<Array> sliced_array = values->Slice(i * batch_size,
batch_size);
+ std::vector<std::shared_ptr<Array>> arrays = {sliced_array};
+ auto batch = ::arrow::RecordBatch::Make(arrow_schema, batch_size,
arrays);
+ ASSERT_OK_NO_THROW(writer->WriteRecordBatch(*batch));
+ }
+ ASSERT_OK_NO_THROW(writer->Close());
+ }
+
+ void ReadAndCheckSingleColumnFile(const Array& values, int num_row_groups) {
+ std::shared_ptr<Array> out;
+
+ std::unique_ptr<FileReader> reader;
+ this->ReaderFromSink(&reader);
+ ASSERT_EQ(num_row_groups, reader->num_row_groups());
+
+ this->ReadSingleColumnFile(std::move(reader), &out);
+ AssertArraysEqual(values, *out);
+ }
+
+ void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values,
+ int num_row_groups) {
+ std::shared_ptr<::arrow::Table> out;
+ std::unique_ptr<FileReader> reader;
+ this->ReaderFromSink(&reader);
+ ASSERT_EQ(num_row_groups, reader->num_row_groups());
+
+ this->ReadTableFromFile(std::move(reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(values->length(), out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0);
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ auto result = chunked_array->chunk(0);
+
+ AssertArraysEqual(*values, *result);
+ }
+};
+
+TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes);
+
+TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) {
+ constexpr int64_t batch_size = SMALL_SIZE / 4;
+ std::shared_ptr<Array> values;
+ ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
+ int num_row_groups = 0;
+ this->WriteBufferedFile(values, batch_size, &num_row_groups);
+ ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values,
num_row_groups));
+}
+
+TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) {
+ constexpr int64_t batch_size = LARGE_SIZE / 4;
+ std::shared_ptr<Array> values;
+ ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
+ int num_row_groups = 0;
+ this->WriteBufferedFile(values, batch_size, &num_row_groups);
+ ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values,
num_row_groups));
+}
+
+TEST(TestReadWriteArrow, WriteAndReadRecordBatch) {
+ auto pool = ::arrow::default_memory_pool();
+ auto sink = CreateOutputStream();
+ // Limit the max number of rows in a row group to 10
+ auto writer_properties =
WriterProperties::Builder().max_row_group_length(10)->build();
+ auto arrow_writer_properties = default_arrow_writer_properties();
+
+ // Prepare schema
+ auto schema = ::arrow::schema(
+ {::arrow::field("a", ::arrow::int64()), ::arrow::field("b",
::arrow::utf8()),
+ ::arrow::field("c", ::arrow::struct_({::arrow::field("c1",
::arrow::int64()),
+ ::arrow::field("c2",
::arrow::utf8())}))});
+ std::shared_ptr<SchemaDescriptor> parquet_schema;
+ ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+ *arrow_writer_properties,
&parquet_schema));
+ auto schema_node =
std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+ // Prepare data
+ auto record_batch = ::arrow::RecordBatchFromJSON(schema, R"([
+ [1, "alfa", {"c1": -3, "c2": "1"}],
+ [null, "alfa", {"c1": null, "c2": "22"}],
+ [3, "beta", {"c1": -2, "c2": "333"}],
+ [null, "gama", {"c1": null, "c2": null}],
+ [5, null, {"c1": -1, "c2": "-333"}],
+ [6, "alfa", {"c1": null, "c2": "-22"}],
+ [7, "beta", {"c1": 0, "c2": "-1"}],
+ [8, "beta", {"c1": null, "c2": null}],
+ [9, null , {"c1": 1, "c2": "0"}],
+ [null, "gama", {"c1": null, "c2": ""}],
+ [11, "foo", {"c1": 2, "c2": "1234"}],
+ [12, "bar", {"c1": null, "c2": "4321"}]
Review Comment:
In order to exercise that `leaf_count` code on the struct array, we need
another array after (since it's added at the end of the loop below). So I
suggest move the string to the end.
```suggestion
[1, {"b1": -3, "b2": "1"}], "alfa"]
[null, {"b1": null, "b2": "22"}], "alfa"]
[3, {"b1": -2, "b2": "333"}], "beta"]
[null, {"b1": null, "b2": null}], "gama"]
[5, {"b1": -1, "b2": "-333"}], null, ]
[6, {"b1": null, "b2": "-22"}], "alfa"]
[7, {"b1": 0, "b2": "-1"}], "beta"]
[8, {"b1": null, "b2": null}], "beta"]
[9, {"b1": 1, "b2": "0"}], null ]
[null, {"b1": null, "b2": ""}], "gama"]
[11, {"b1": 2, "b2": "1234"}], "foo",]
[12, {"b1": null, "b2": "4321"}] "bar",]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]