[
https://issues.apache.org/jira/browse/PARQUET-1372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592558#comment-16592558
]
ASF GitHub Bot commented on PARQUET-1372:
-----------------------------------------
xhochy closed pull request #484: PARQUET-1372: Add an API to allow writing
RowGroups based on size
URL: https://github.com/apache/parquet-cpp/pull/484
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5b3c4603..698f6d76 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -458,7 +458,7 @@ add_custom_target(format-example
${BUILD_SUPPORT_DIR}/run_clang_format.py
${CLANG_FORMAT_BIN}
${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt
- ${CMAKE_CURRENT_SOURCE_DIR}/examples/parquet-arrow)
+ ${CMAKE_CURRENT_SOURCE_DIR}/examples)
add_custom_target(format
DEPENDS format-example
@@ -474,7 +474,7 @@ add_custom_target(format
add_custom_target(check-format-examples
${BUILD_SUPPORT_DIR}/run_clang_format.py
${CLANG_FORMAT_BIN}
${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt
- ${CMAKE_CURRENT_SOURCE_DIR}/examples/parquet-arrow 1)
+ ${CMAKE_CURRENT_SOURCE_DIR}/examples 1)
add_custom_target(check-format
DEPENDS check-format-examples
COMMAND
diff --git a/examples/low-level-api/CMakeLists.txt
b/examples/low-level-api/CMakeLists.txt
index 721fa9a1..64ba110e 100644
--- a/examples/low-level-api/CMakeLists.txt
+++ b/examples/low-level-api/CMakeLists.txt
@@ -17,5 +17,9 @@
if (PARQUET_BUILD_EXECUTABLES)
add_executable(reader-writer reader-writer.cc)
+ add_executable(reader-writer2 reader-writer2.cc)
+ target_include_directories(reader-writer PRIVATE .)
+ target_include_directories(reader-writer2 PRIVATE .)
target_link_libraries(reader-writer parquet_static)
+ target_link_libraries(reader-writer2 parquet_static)
endif()
diff --git a/examples/low-level-api/reader-writer.cc
b/examples/low-level-api/reader-writer.cc
index fb2ec774..09cd1377 100644
--- a/examples/low-level-api/reader-writer.cc
+++ b/examples/low-level-api/reader-writer.cc
@@ -18,19 +18,16 @@
#include <cassert>
#include <fstream>
#include <iostream>
-#include <list>
#include <memory>
-#include <arrow/io/file.h>
-#include <arrow/util/logging.h>
-
-#include <parquet/api/reader.h>
-#include <parquet/api/writer.h>
+#include <reader_writer.h>
/*
* This example describes writing and reading Parquet Files in C++ and serves
as a
* reference to the API.
* The file contains all the physical data types supported by Parquet.
+ * This example uses the RowGroupWriter API that supports writing RowGroups
optimized for
+ *memory consumption
**/
/* Parquet is a structured columnar file format
@@ -46,56 +43,8 @@
**/
constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
-constexpr int FIXED_LENGTH = 10;
const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet";
-using parquet::Repetition;
-using parquet::Type;
-using parquet::LogicalType;
-using parquet::schema::PrimitiveNode;
-using parquet::schema::GroupNode;
-
-static std::shared_ptr<GroupNode> SetupSchema() {
- parquet::schema::NodeVector fields;
- // Create a primitive node named 'boolean_field' with type:BOOLEAN,
- // repetition:REQUIRED
- fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED,
- Type::BOOLEAN, LogicalType::NONE));
-
- // Create a primitive node named 'int32_field' with type:INT32,
repetition:REQUIRED,
- // logical type:TIME_MILLIS
- fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED,
Type::INT32,
- LogicalType::TIME_MILLIS));
-
- // Create a primitive node named 'int64_field' with type:INT64,
repetition:REPEATED
- fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED,
Type::INT64,
- LogicalType::NONE));
-
- fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED,
Type::INT96,
- LogicalType::NONE));
-
- fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED,
Type::FLOAT,
- LogicalType::NONE));
-
- fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED,
Type::DOUBLE,
- LogicalType::NONE));
-
- // Create a primitive node named 'ba_field' with type:BYTE_ARRAY,
repetition:OPTIONAL
- fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL,
Type::BYTE_ARRAY,
- LogicalType::NONE));
-
- // Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY,
- // repetition:REQUIRED, field_length = FIXED_LENGTH
- fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED,
- Type::FIXED_LEN_BYTE_ARRAY,
LogicalType::NONE,
- FIXED_LENGTH));
-
- // Create a GroupNode named 'schema' using the primitive nodes defined above
- // This GroupNode is the root node of the schema tree
- return std::static_pointer_cast<GroupNode>(
- GroupNode::Make("schema", Repetition::REQUIRED, fields));
-}
-
int main(int argc, char** argv) {
/**********************************************************************************
PARQUET WRITER EXAMPLE
@@ -122,8 +71,7 @@ int main(int argc, char** argv) {
parquet::ParquetFileWriter::Open(out_file, schema, props);
// Append a RowGroup with a specific number of rows.
- parquet::RowGroupWriter* rg_writer =
- file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);
+ parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
// Write the Bool column
parquet::BoolWriter* bool_writer =
diff --git a/examples/low-level-api/reader-writer2.cc
b/examples/low-level-api/reader-writer2.cc
new file mode 100644
index 00000000..dded5fa1
--- /dev/null
+++ b/examples/low-level-api/reader-writer2.cc
@@ -0,0 +1,430 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cassert>
+#include <fstream>
+#include <iostream>
+#include <memory>
+
+#include <reader_writer.h>
+
+/*
+ * This example describes writing and reading Parquet Files in C++ and serves
as a
+ * reference to the API.
+ * The file contains all the physical data types supported by Parquet.
+ * This example uses the RowGroupWriter API that supports writing RowGroups
based on a
+ *certain size
+ **/
+
+/* Parquet is a structured columnar file format
+ * Parquet File = "Parquet data" + "Parquet Metadata"
+ * "Parquet data" is simply a vector of RowGroups. Each RowGroup is a batch of
rows in a
+ * columnar layout
+ * "Parquet Metadata" contains the "file schema" and attributes of the
RowGroups and their
+ * Columns
+ * "file schema" is a tree where each node is either a primitive type (leaf
nodes) or a
+ * complex (nested) type (internal nodes)
+ * For specific details, please refer the format here:
+ * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
+ **/
+
+constexpr int NUM_ROWS = 2500000;
+constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024; // 16 MB
+const char PARQUET_FILENAME[] = "parquet_cpp_example2.parquet";
+
+int main(int argc, char** argv) {
+
/**********************************************************************************
+ PARQUET WRITER EXAMPLE
+
**********************************************************************************/
+ // parquet::REQUIRED fields do not need definition and repetition level
values
+ // parquet::OPTIONAL fields require only definition level values
+ // parquet::REPEATED fields require both definition and repetition level
values
+ try {
+ // Create a local file output stream instance.
+ using FileClass = ::arrow::io::FileOutputStream;
+ std::shared_ptr<FileClass> out_file;
+ PARQUET_THROW_NOT_OK(FileClass::Open(PARQUET_FILENAME, &out_file));
+
+ // Setup the parquet schema
+ std::shared_ptr<GroupNode> schema = SetupSchema();
+
+ // Add writer properties
+ parquet::WriterProperties::Builder builder;
+ builder.compression(parquet::Compression::SNAPPY);
+ std::shared_ptr<parquet::WriterProperties> props = builder.build();
+
+ // Create a ParquetFileWriter instance
+ std::shared_ptr<parquet::ParquetFileWriter> file_writer =
+ parquet::ParquetFileWriter::Open(out_file, schema, props);
+
+ // Append a BufferedRowGroup to keep the RowGroup open until a certain size
+ parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup();
+
+ int num_columns = file_writer->num_columns();
+ std::vector<int64_t> buffered_values_estimate(num_columns, 0);
+ for (int i = 0; i < NUM_ROWS; i++) {
+ int64_t estimated_bytes = 0;
+ // Get the estimated size of the values that are not written to a page
yet
+ for (int n = 0; n < num_columns; n++) {
+ estimated_bytes += buffered_values_estimate[n];
+ }
+
+ // We need to consider the compressed pages
+ // as well as the values that are not compressed yet
+ if ((rg_writer->total_bytes_written() +
rg_writer->total_compressed_bytes() +
+ estimated_bytes) > ROW_GROUP_SIZE) {
+ rg_writer->Close();
+ std::fill(buffered_values_estimate.begin(),
buffered_values_estimate.end(), 0);
+ rg_writer = file_writer->AppendBufferedRowGroup();
+ }
+
+ int col_id = 0;
+ // Write the Bool column
+ parquet::BoolWriter* bool_writer =
+ static_cast<parquet::BoolWriter*>(rg_writer->column(col_id));
+ bool bool_value = ((i % 2) == 0) ? true : false;
+ bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value);
+ buffered_values_estimate[col_id] =
bool_writer->EstimatedBufferedValueBytes();
+
+ // Write the Int32 column
+ col_id++;
+ parquet::Int32Writer* int32_writer =
+ static_cast<parquet::Int32Writer*>(rg_writer->column(col_id));
+ int32_t int32_value = i;
+ int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value);
+ buffered_values_estimate[col_id] =
int32_writer->EstimatedBufferedValueBytes();
+
+ // Write the Int64 column. Each row has repeats twice.
+ col_id++;
+ parquet::Int64Writer* int64_writer =
+ static_cast<parquet::Int64Writer*>(rg_writer->column(col_id));
+ int64_t int64_value1 = 2 * i;
+ int16_t definition_level = 1;
+ int16_t repetition_level = 0;
+ int64_writer->WriteBatch(1, &definition_level, &repetition_level,
&int64_value1);
+ int64_t int64_value2 = (2 * i + 1);
+ repetition_level = 1; // start of a new record
+ int64_writer->WriteBatch(1, &definition_level, &repetition_level,
&int64_value2);
+ buffered_values_estimate[col_id] =
int64_writer->EstimatedBufferedValueBytes();
+
+ // Write the INT96 column.
+ col_id++;
+ parquet::Int96Writer* int96_writer =
+ static_cast<parquet::Int96Writer*>(rg_writer->column(col_id));
+ parquet::Int96 int96_value;
+ int96_value.value[0] = i;
+ int96_value.value[1] = i + 1;
+ int96_value.value[2] = i + 2;
+ int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value);
+ buffered_values_estimate[col_id] =
int96_writer->EstimatedBufferedValueBytes();
+
+ // Write the Float column
+ col_id++;
+ parquet::FloatWriter* float_writer =
+ static_cast<parquet::FloatWriter*>(rg_writer->column(col_id));
+ float float_value = static_cast<float>(i) * 1.1f;
+ float_writer->WriteBatch(1, nullptr, nullptr, &float_value);
+ buffered_values_estimate[col_id] =
float_writer->EstimatedBufferedValueBytes();
+
+ // Write the Double column
+ col_id++;
+ parquet::DoubleWriter* double_writer =
+ static_cast<parquet::DoubleWriter*>(rg_writer->column(col_id));
+ double double_value = i * 1.1111111;
+ double_writer->WriteBatch(1, nullptr, nullptr, &double_value);
+ buffered_values_estimate[col_id] =
double_writer->EstimatedBufferedValueBytes();
+
+ // Write the ByteArray column. Make every alternate values NULL
+ col_id++;
+ parquet::ByteArrayWriter* ba_writer =
+ static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
+ parquet::ByteArray ba_value;
+ char hello[FIXED_LENGTH] = "parquet";
+ hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
+ hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
+ hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
+ if (i % 2 == 0) {
+ int16_t definition_level = 1;
+ ba_value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
+ ba_value.len = FIXED_LENGTH;
+ ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value);
+ } else {
+ int16_t definition_level = 0;
+ ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
+ }
+ buffered_values_estimate[col_id] =
ba_writer->EstimatedBufferedValueBytes();
+
+ // Write the FixedLengthByteArray column
+ col_id++;
+ parquet::FixedLenByteArrayWriter* flba_writer =
+
static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->column(col_id));
+ parquet::FixedLenByteArray flba_value;
+ char v = static_cast<char>(i);
+ char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
+ flba_value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
+
+ flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value);
+ buffered_values_estimate[col_id] =
flba_writer->EstimatedBufferedValueBytes();
+ }
+
+ // Close the RowGroupWriter
+ rg_writer->Close();
+ // Close the ParquetFileWriter
+ file_writer->Close();
+
+ // Write the bytes to file
+ DCHECK(out_file->Close().ok());
+ } catch (const std::exception& e) {
+ std::cerr << "Parquet write error: " << e.what() << std::endl;
+ return -1;
+ }
+
+
/**********************************************************************************
+ PARQUET READER EXAMPLE
+
**********************************************************************************/
+
+ try {
+ // Create a ParquetReader instance
+ std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+ parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false);
+
+ // Get the File MetaData
+ std::shared_ptr<parquet::FileMetaData> file_metadata =
parquet_reader->metadata();
+
+ int num_row_groups = file_metadata->num_row_groups();
+
+ // Get the number of Columns
+ int num_columns = file_metadata->num_columns();
+ assert(num_columns == 8);
+
+ std::vector<int> col_row_counts(num_columns, 0);
+
+ // Iterate over all the RowGroups in the file
+ for (int r = 0; r < num_row_groups; ++r) {
+ // Get the RowGroup Reader
+ std::shared_ptr<parquet::RowGroupReader> row_group_reader =
+ parquet_reader->RowGroup(r);
+
+ assert(row_group_reader->metadata()->total_byte_size() < ROW_GROUP_SIZE);
+
+ int64_t values_read = 0;
+ int64_t rows_read = 0;
+ int16_t definition_level;
+ int16_t repetition_level;
+ std::shared_ptr<parquet::ColumnReader> column_reader;
+ int col_id = 0;
+
+ // Get the Column Reader for the boolean column
+ column_reader = row_group_reader->Column(col_id);
+ parquet::BoolReader* bool_reader =
+ static_cast<parquet::BoolReader*>(column_reader.get());
+
+ // Read all the rows in the column
+ while (bool_reader->HasNext()) {
+ bool value;
+ // Read one value at a time. The number of rows read is returned.
values_read
+ // contains the number of non-null rows
+ rows_read = bool_reader->ReadBatch(1, nullptr, nullptr, &value,
&values_read);
+ // Ensure only one value is read
+ assert(rows_read == 1);
+ // There are no NULL values in the rows written
+ assert(values_read == 1);
+ // Verify the value written
+ bool expected_value = ((col_row_counts[col_id] % 2) == 0) ? true :
false;
+ assert(value == expected_value);
+ col_row_counts[col_id]++;
+ }
+
+ // Get the Column Reader for the Int32 column
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
+ parquet::Int32Reader* int32_reader =
+ static_cast<parquet::Int32Reader*>(column_reader.get());
+ // Read all the rows in the column
+ while (int32_reader->HasNext()) {
+ int32_t value;
+ // Read one value at a time. The number of rows read is returned.
values_read
+ // contains the number of non-null rows
+ rows_read = int32_reader->ReadBatch(1, nullptr, nullptr, &value,
&values_read);
+ // Ensure only one value is read
+ assert(rows_read == 1);
+ // There are no NULL values in the rows written
+ assert(values_read == 1);
+ // Verify the value written
+ assert(value == col_row_counts[col_id]);
+ col_row_counts[col_id]++;
+ }
+
+ // Get the Column Reader for the Int64 column
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
+ parquet::Int64Reader* int64_reader =
+ static_cast<parquet::Int64Reader*>(column_reader.get());
+ // Read all the rows in the column
+ while (int64_reader->HasNext()) {
+ int64_t value;
+ // Read one value at a time. The number of rows read is returned.
values_read
+ // contains the number of non-null rows
+ rows_read = int64_reader->ReadBatch(1, &definition_level,
&repetition_level,
+ &value, &values_read);
+ // Ensure only one value is read
+ assert(rows_read == 1);
+ // There are no NULL values in the rows written
+ assert(values_read == 1);
+ // Verify the value written
+ int64_t expected_value = col_row_counts[col_id];
+ assert(value == expected_value);
+ if ((col_row_counts[col_id] % 2) == 0) {
+ assert(repetition_level == 0);
+ } else {
+ assert(repetition_level == 1);
+ }
+ col_row_counts[col_id]++;
+ }
+
+ // Get the Column Reader for the Int96 column
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
+ parquet::Int96Reader* int96_reader =
+ static_cast<parquet::Int96Reader*>(column_reader.get());
+ // Read all the rows in the column
+ while (int96_reader->HasNext()) {
+ parquet::Int96 value;
+ // Read one value at a time. The number of rows read is returned.
values_read
+ // contains the number of non-null rows
+ rows_read = int96_reader->ReadBatch(1, nullptr, nullptr, &value,
&values_read);
+ // Ensure only one value is read
+ assert(rows_read == 1);
+ // There are no NULL values in the rows written
+ assert(values_read == 1);
+ // Verify the value written
+ parquet::Int96 expected_value;
+ expected_value.value[0] = col_row_counts[col_id];
+ expected_value.value[1] = col_row_counts[col_id] + 1;
+ expected_value.value[2] = col_row_counts[col_id] + 2;
+ for (int j = 0; j < 3; j++) {
+ assert(value.value[j] == expected_value.value[j]);
+ }
+ col_row_counts[col_id]++;
+ }
+
+ // Get the Column Reader for the Float column
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
+ parquet::FloatReader* float_reader =
+ static_cast<parquet::FloatReader*>(column_reader.get());
+ // Read all the rows in the column
+ while (float_reader->HasNext()) {
+ float value;
+ // Read one value at a time. The number of rows read is returned.
values_read
+ // contains the number of non-null rows
+ rows_read = float_reader->ReadBatch(1, nullptr, nullptr, &value,
&values_read);
+ // Ensure only one value is read
+ assert(rows_read == 1);
+ // There are no NULL values in the rows written
+ assert(values_read == 1);
+ // Verify the value written
+ float expected_value = static_cast<float>(col_row_counts[col_id]) *
1.1f;
+ assert(value == expected_value);
+ col_row_counts[col_id]++;
+ }
+
+ // Get the Column Reader for the Double column
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
+ parquet::DoubleReader* double_reader =
+ static_cast<parquet::DoubleReader*>(column_reader.get());
+ // Read all the rows in the column
+ while (double_reader->HasNext()) {
+ double value;
+ // Read one value at a time. The number of rows read is returned.
values_read
+ // contains the number of non-null rows
+ rows_read = double_reader->ReadBatch(1, nullptr, nullptr, &value,
&values_read);
+ // Ensure only one value is read
+ assert(rows_read == 1);
+ // There are no NULL values in the rows written
+ assert(values_read == 1);
+ // Verify the value written
+ double expected_value = col_row_counts[col_id] * 1.1111111;
+ assert(value == expected_value);
+ col_row_counts[col_id]++;
+ }
+
+ // Get the Column Reader for the ByteArray column
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
+ parquet::ByteArrayReader* ba_reader =
+ static_cast<parquet::ByteArrayReader*>(column_reader.get());
+ // Read all the rows in the column
+ while (ba_reader->HasNext()) {
+ parquet::ByteArray value;
+ // Read one value at a time. The number of rows read is returned.
values_read
+ // contains the number of non-null rows
+ rows_read =
+ ba_reader->ReadBatch(1, &definition_level, nullptr, &value,
&values_read);
+ // Ensure only one value is read
+ assert(rows_read == 1);
+ // Verify the value written
+ char expected_value[FIXED_LENGTH] = "parquet";
+ expected_value[7] = static_cast<char>('0' + col_row_counts[col_id] /
100);
+ expected_value[8] = static_cast<char>('0' + (col_row_counts[col_id] /
10) % 10);
+ expected_value[9] = static_cast<char>('0' + col_row_counts[col_id] %
10);
+ if (col_row_counts[col_id] % 2 == 0) { // only alternate values exist
+ // There are no NULL values in the rows written
+ assert(values_read == 1);
+ assert(value.len == FIXED_LENGTH);
+ assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
+ assert(definition_level == 1);
+ } else {
+ // There are NULL values in the rows written
+ assert(values_read == 0);
+ assert(definition_level == 0);
+ }
+ col_row_counts[col_id]++;
+ }
+
+ // Get the Column Reader for the FixedLengthByteArray column
+ col_id++;
+ column_reader = row_group_reader->Column(col_id);
+ parquet::FixedLenByteArrayReader* flba_reader =
+ static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
+ // Read all the rows in the column
+ while (flba_reader->HasNext()) {
+ parquet::FixedLenByteArray value;
+ // Read one value at a time. The number of rows read is returned.
values_read
+ // contains the number of non-null rows
+ rows_read = flba_reader->ReadBatch(1, nullptr, nullptr, &value,
&values_read);
+ // Ensure only one value is read
+ assert(rows_read == 1);
+ // There are no NULL values in the rows written
+ assert(values_read == 1);
+ // Verify the value written
+ char v = static_cast<char>(col_row_counts[col_id]);
+ char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
+ assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
+ col_row_counts[col_id]++;
+ }
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "Parquet read error: " << e.what() << std::endl;
+ return -1;
+ }
+
+ std::cout << "Parquet Writing and Reading Complete" << std::endl;
+
+ return 0;
+}
diff --git a/examples/low-level-api/reader_writer.h
b/examples/low-level-api/reader_writer.h
new file mode 100644
index 00000000..3fda0cfa
--- /dev/null
+++ b/examples/low-level-api/reader_writer.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/io/file.h>
+#include <arrow/util/logging.h>
+
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+
+using parquet::LogicalType;
+using parquet::Repetition;
+using parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+constexpr int FIXED_LENGTH = 10;
+
+static std::shared_ptr<GroupNode> SetupSchema() {
+ parquet::schema::NodeVector fields;
+ // Create a primitive node named 'boolean_field' with type:BOOLEAN,
+ // repetition:REQUIRED
+ fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED,
+ Type::BOOLEAN, LogicalType::NONE));
+
+ // Create a primitive node named 'int32_field' with type:INT32,
repetition:REQUIRED,
+ // logical type:TIME_MILLIS
+ fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED,
Type::INT32,
+ LogicalType::TIME_MILLIS));
+
+ // Create a primitive node named 'int64_field' with type:INT64,
repetition:REPEATED
+ fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED,
Type::INT64,
+ LogicalType::NONE));
+
+ fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED,
Type::INT96,
+ LogicalType::NONE));
+
+ fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED,
Type::FLOAT,
+ LogicalType::NONE));
+
+ fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED,
Type::DOUBLE,
+ LogicalType::NONE));
+
+ // Create a primitive node named 'ba_field' with type:BYTE_ARRAY,
repetition:OPTIONAL
+ fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL,
Type::BYTE_ARRAY,
+ LogicalType::NONE));
+
+ // Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY,
+ // repetition:REQUIRED, field_length = FIXED_LENGTH
+ fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED,
+ Type::FIXED_LEN_BYTE_ARRAY,
LogicalType::NONE,
+ FIXED_LENGTH));
+
+ // Create a GroupNode named 'schema' using the primitive nodes defined above
+ // This GroupNode is the root node of the schema tree
+ return std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED, fields));
+}
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 19837dbf..631bb710 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -79,8 +79,7 @@ typename std::enable_if<is_arrow_float<ArrowType>::value,
Status>::type NonNullA
size_t size, std::shared_ptr<Array>* out) {
using c_type = typename ArrowType::c_type;
std::vector<c_type> values;
- ::arrow::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1),
- &values);
+ ::arrow::random_real(size, 0, static_cast<c_type>(0),
static_cast<c_type>(1), &values);
::arrow::NumericBuilder<ArrowType> builder;
RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
return builder.Finish(out);
@@ -200,8 +199,8 @@ typename std::enable_if<is_arrow_float<ArrowType>::value,
Status>::type Nullable
size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out)
{
using c_type = typename ArrowType::c_type;
std::vector<c_type> values;
- ::arrow::random_real(size, seed, static_cast<c_type>(-1e10),
- static_cast<c_type>(1e10), &values);
+ ::arrow::random_real(size, seed, static_cast<c_type>(-1e10),
static_cast<c_type>(1e10),
+ &values);
std::vector<uint8_t> valid_bytes(size, 1);
for (size_t i = 0; i < num_nulls; i++) {
diff --git a/src/parquet/column_writer-test.cc
b/src/parquet/column_writer-test.cc
index dd892817..e87d549b 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -47,7 +47,6 @@ const int LARGE_SIZE = 100000;
const int VERY_LARGE_SIZE = 400000;
#endif
-
template <typename TestType>
class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
public:
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 934530c0..a65bda85 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -249,6 +249,16 @@ class SerializedPageWriter : public PageWriter {
bool has_compressor() override { return (compressor_ != nullptr); }
+ int64_t num_values() { return num_values_; }
+
+ int64_t dictionary_page_offset() { return dictionary_page_offset_; }
+
+ int64_t data_page_offset() { return data_page_offset_; }
+
+ int64_t total_compressed_size() { return total_compressed_size_; }
+
+ int64_t total_uncompressed_size() { return total_uncompressed_size_; }
+
private:
OutputStream* sink_;
ColumnChunkMetaDataBuilder* metadata_;
@@ -263,11 +273,64 @@ class SerializedPageWriter : public PageWriter {
std::unique_ptr<::arrow::Codec> compressor_;
};
+// This implementation of the PageWriter writes to the final sink on Close .
+class BufferedPageWriter : public PageWriter {
+ public:
+ BufferedPageWriter(OutputStream* sink, Compression::type codec,
+ ColumnChunkMetaDataBuilder* metadata,
+ ::arrow::MemoryPool* pool =
::arrow::default_memory_pool())
+ : final_sink_(sink),
+ metadata_(metadata),
+ in_memory_sink_(new InMemoryOutputStream(pool)),
+ pager_(new SerializedPageWriter(in_memory_sink_.get(), codec,
metadata, pool)) {}
+
+ int64_t WriteDictionaryPage(const DictionaryPage& page) override {
+ return pager_->WriteDictionaryPage(page);
+ }
+
+ void Close(bool has_dictionary, bool fallback) override {
+ // index_page_offset = -1 since they are not supported
+ metadata_->Finish(
+ pager_->num_values(), pager_->dictionary_page_offset() +
final_sink_->Tell(), -1,
+ pager_->data_page_offset() + final_sink_->Tell(),
pager_->total_compressed_size(),
+ pager_->total_uncompressed_size(), has_dictionary, fallback);
+
+ // Write metadata at end of column chunk
+ metadata_->WriteTo(in_memory_sink_.get());
+
+ // flush everything to the serialized sink
+ auto buffer = in_memory_sink_->GetBuffer();
+ final_sink_->Write(buffer->data(), buffer->size());
+ }
+
+ int64_t WriteDataPage(const CompressedDataPage& page) override {
+ return pager_->WriteDataPage(page);
+ }
+
+ void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer)
override {
+ pager_->Compress(src_buffer, dest_buffer);
+ }
+
+ bool has_compressor() override { return pager_->has_compressor(); }
+
+ private:
+ OutputStream* final_sink_;
+ ColumnChunkMetaDataBuilder* metadata_;
+ std::unique_ptr<InMemoryOutputStream> in_memory_sink_;
+ std::unique_ptr<SerializedPageWriter> pager_;
+};
+
std::unique_ptr<PageWriter> PageWriter::Open(OutputStream* sink,
Compression::type codec,
ColumnChunkMetaDataBuilder*
metadata,
- ::arrow::MemoryPool* pool) {
- return std::unique_ptr<PageWriter>(
- new SerializedPageWriter(sink, codec, metadata, pool));
+ ::arrow::MemoryPool* pool,
+ bool buffered_row_group) {
+ if (buffered_row_group) {
+ return std::unique_ptr<PageWriter>(
+ new BufferedPageWriter(sink, codec, metadata, pool));
+ } else {
+ return std::unique_ptr<PageWriter>(
+ new SerializedPageWriter(sink, codec, metadata, pool));
+ }
}
// ----------------------------------------------------------------------
@@ -294,6 +357,7 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder*
metadata,
num_buffered_encoded_values_(0),
rows_written_(0),
total_bytes_written_(0),
+ total_compressed_bytes_(0),
closed_(false),
fallback_(false) {
definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
@@ -404,6 +468,7 @@ void ColumnWriter::AddDataPage() {
CompressedDataPage page(compressed_data_copy,
static_cast<int32_t>(num_buffered_values_),
encoding_,
Encoding::RLE, Encoding::RLE, uncompressed_size,
page_stats);
+ total_compressed_bytes_ += page.size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page));
} else { // Eagerly write pages
CompressedDataPage page(compressed_data,
static_cast<int32_t>(num_buffered_values_),
@@ -432,7 +497,7 @@ int64_t ColumnWriter::Close() {
FlushBufferedDataPages();
EncodedStatistics chunk_statistics = GetChunkStatistics();
- // Write stats only if the column has atleast one row written
+ // Write stats only if the column has at least one row written
// From parquet-mr
// Don't write stats larger than the max size rather than truncating. The
// rationale is that some engines may use the minimum value in the page as
@@ -459,6 +524,7 @@ void ColumnWriter::FlushBufferedDataPages() {
WriteDataPage(data_pages_[i]);
}
data_pages_.clear();
+ total_compressed_bytes_ = 0;
}
// ----------------------------------------------------------------------
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 6b847480..1ba428a9 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -75,7 +75,8 @@ class PageWriter {
static std::unique_ptr<PageWriter> Open(
OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder*
metadata,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
+ bool buffered_row_group = false);
// The Column Writer decides if dictionary encoding is used if set and
// if the dictionary encoding has fallen back to default encoding on
reaching dictionary
@@ -117,6 +118,12 @@ class PARQUET_EXPORT ColumnWriter {
int64_t rows_written() const { return rows_written_; }
+ // Only considers the size of the compressed pages + page header
+ // Some values might be still buffered an not written to a page yet
+ int64_t total_compressed_bytes() const { return total_compressed_bytes_; }
+
+ int64_t total_bytes_written() const { return total_bytes_written_; }
+
const WriterProperties* properties() { return properties_; }
protected:
@@ -192,6 +199,9 @@ class PARQUET_EXPORT ColumnWriter {
// Records the total number of bytes written by the serializer
int64_t total_bytes_written_;
+ // Records the current number of compressed bytes in a column
+ int64_t total_compressed_bytes_;
+
// Flag to check if the Writer has been closed
bool closed_;
@@ -258,6 +268,11 @@ class PARQUET_EXPORT TypedColumnWriter : public
ColumnWriter {
const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const T* values);
+ // Estimated size of the values that are not written to a page yet
+ int64_t EstimatedBufferedValueBytes() const {
+ return current_encoder_->EstimatedDataEncodedSize();
+ }
+
protected:
std::shared_ptr<Buffer> GetValuesBuffer() override {
return current_encoder_->FlushValues();
diff --git a/src/parquet/file-serialize-test.cc
b/src/parquet/file-serialize-test.cc
index 19934041..750faa20 100644
--- a/src/parquet/file-serialize-test.cc
+++ b/src/parquet/file-serialize-test.cc
@@ -41,8 +41,9 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
void SetUp() {
num_columns_ = 4;
- num_rowgroups_ = 2;
+ num_rowgroups_ = 4;
rows_per_rowgroup_ = 50;
+ rows_per_batch_ = 10;
this->SetUpSchema(Repetition::OPTIONAL, num_columns_);
}
@@ -50,6 +51,7 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
int num_columns_;
int num_rowgroups_;
int rows_per_rowgroup_;
+ int rows_per_batch_;
void FileSerializeTest(Compression::type codec_type) {
std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
@@ -63,20 +65,44 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
- for (int rg = 0; rg < num_rowgroups_; ++rg) {
+ this->GenerateData(rows_per_rowgroup_);
+ for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
RowGroupWriter* row_group_writer;
row_group_writer = file_writer->AppendRowGroup();
- this->GenerateData(rows_per_rowgroup_);
for (int col = 0; col < num_columns_; ++col) {
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
column_writer->WriteBatch(rows_per_rowgroup_,
this->def_levels_.data(), nullptr,
this->values_ptr_);
column_writer->Close();
+ // Ensure column() API which is specific to BufferedRowGroup cannot be
called
+ ASSERT_THROW(row_group_writer->column(col), ParquetException);
}
row_group_writer->Close();
}
+ // Write half BufferedRowGroups
+ for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
+ RowGroupWriter* row_group_writer;
+ row_group_writer = file_writer->AppendBufferedRowGroup();
+ for (int batch = 0; batch < (rows_per_rowgroup_ / rows_per_batch_);
++batch) {
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+ column_writer->WriteBatch(
+ rows_per_batch_, this->def_levels_.data() + (batch *
rows_per_batch_),
+ nullptr, this->values_ptr_ + (batch * rows_per_batch_));
+ // Ensure NextColumn() API which is specific to RowGroup cannot be
called
+ ASSERT_THROW(row_group_writer->NextColumn(), ParquetException);
+ }
+ }
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+ column_writer->Close();
+ }
+ row_group_writer->Close();
+ }
file_writer->Close();
auto buffer = sink->GetBuffer();
@@ -137,6 +163,30 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
file_writer->Close();
}
+ void UnequalNumRowsBuffered(int64_t max_rows,
+ const std::vector<int64_t> rows_per_column) {
+ std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+ auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+
+ std::shared_ptr<WriterProperties> props =
WriterProperties::Builder().build();
+
+ auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
+
+ RowGroupWriter* row_group_writer;
+ row_group_writer = file_writer->AppendBufferedRowGroup();
+
+ this->GenerateData(max_rows);
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+ column_writer->WriteBatch(rows_per_column[col],
this->def_levels_.data(), nullptr,
+ this->values_ptr_);
+ column_writer->Close();
+ }
+ row_group_writer->Close();
+ file_writer->Close();
+ }
+
void RepeatedUnequalRows() {
// Optional and repeated, so definition and repetition levels
this->SetUpSchema(Repetition::REPEATED);
@@ -186,15 +236,23 @@ class TestSerialize : public PrimitiveTypedTest<TestType>
{
auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
RowGroupWriter* row_group_writer;
- row_group_writer = file_writer->AppendRowGroup();
+ row_group_writer = file_writer->AppendRowGroup();
for (int col = 0; col < num_columns_; ++col) {
auto column_writer =
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
column_writer->Close();
}
+ row_group_writer->Close();
+ row_group_writer = file_writer->AppendBufferedRowGroup();
+ for (int col = 0; col < num_columns_; ++col) {
+ auto column_writer =
+
static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+ column_writer->Close();
+ }
row_group_writer->Close();
+
file_writer->Close();
}
};
@@ -212,11 +270,13 @@ TYPED_TEST(TestSerialize, SmallFileUncompressed) {
TYPED_TEST(TestSerialize, TooFewRows) {
std::vector<int64_t> num_rows = {100, 100, 100, 99};
ASSERT_THROW(this->UnequalNumRows(100, num_rows), ParquetException);
+ ASSERT_THROW(this->UnequalNumRowsBuffered(100, num_rows), ParquetException);
}
TYPED_TEST(TestSerialize, TooManyRows) {
std::vector<int64_t> num_rows = {100, 100, 100, 101};
ASSERT_THROW(this->UnequalNumRows(101, num_rows), ParquetException);
+ ASSERT_THROW(this->UnequalNumRowsBuffered(101, num_rows), ParquetException);
}
TYPED_TEST(TestSerialize, ZeroRows) {
ASSERT_NO_THROW(this->ZeroRowsRowGroup()); }
diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc
index 9b2d9b00..30673c59 100644
--- a/src/parquet/file_writer.cc
+++ b/src/parquet/file_writer.cc
@@ -17,6 +17,8 @@
#include "parquet/file_writer.h"
+#include <vector>
+
#include "parquet/column_writer.h"
#include "parquet/schema-internal.h"
#include "parquet/schema.h"
@@ -47,12 +49,28 @@ void RowGroupWriter::Close() {
ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
+ColumnWriter* RowGroupWriter::column(int i) { return contents_->column(i); }
+
+int64_t RowGroupWriter::total_compressed_bytes() const {
+ return contents_->total_compressed_bytes();
+}
+
+int64_t RowGroupWriter::total_bytes_written() const {
+ return contents_->total_bytes_written();
+}
+
int RowGroupWriter::current_column() { return contents_->current_column(); }
int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
+inline void ThrowRowsMisMatchError(int col, int64_t prev, int64_t curr) {
+ std::stringstream ss;
+ ss << "Column " << col << " had " << curr << " while previous column had "
<< prev;
+ throw ParquetException(ss.str());
+}
+
// ----------------------------------------------------------------------
// RowGroupSerializer
@@ -60,34 +78,45 @@ int64_t RowGroupWriter::num_rows() const { return
contents_->num_rows(); }
class RowGroupSerializer : public RowGroupWriter::Contents {
public:
RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
- const WriterProperties* properties)
+ const WriterProperties* properties, bool
buffered_row_group = false)
: sink_(sink),
metadata_(metadata),
properties_(properties),
total_bytes_written_(0),
closed_(false),
current_column_index_(0),
- num_rows_(-1) {}
+ num_rows_(0),
+ buffered_row_group_(buffered_row_group) {
+ if (buffered_row_group) {
+ InitColumns();
+ } else {
+ column_writers_.push_back(nullptr);
+ }
+ }
int num_columns() const override { return metadata_->num_columns(); }
int64_t num_rows() const override {
- if (current_column_writer_) {
- CheckRowsWritten();
- }
- return num_rows_ < 0 ? 0 : num_rows_;
+ CheckRowsWritten();
+ // CheckRowsWritten ensures num_rows_ is set correctly
+ return num_rows_;
}
ColumnWriter* NextColumn() override {
- if (current_column_writer_) {
+ if (buffered_row_group_) {
+ throw ParquetException(
+ "NextColumn() is not supported when a RowGroup is written by size");
+ }
+
+ if (column_writers_[0]) {
CheckRowsWritten();
}
// Throws an error if more columns are being written
auto col_meta = metadata_->NextColumnChunk();
- if (current_column_writer_) {
- total_bytes_written_ += current_column_writer_->Close();
+ if (column_writers_[0]) {
+ total_bytes_written_ += column_writers_[0]->Close();
}
++current_column_index_;
@@ -96,23 +125,60 @@ class RowGroupSerializer : public RowGroupWriter::Contents
{
std::unique_ptr<PageWriter> pager =
PageWriter::Open(sink_,
properties_->compression(column_descr->path()), col_meta,
properties_->memory_pool());
- current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager),
properties_);
- return current_column_writer_.get();
+ column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager),
properties_);
+ return column_writers_[0].get();
+ }
+
+ ColumnWriter* column(int i) override {
+ if (!buffered_row_group_) {
+ throw ParquetException(
+ "column() is only supported when a BufferedRowGroup is being
written");
+ }
+
+ if (i >= 0 && i < static_cast<int>(column_writers_.size())) {
+ return column_writers_[i].get();
+ }
+ return nullptr;
}
int current_column() const override { return metadata_->current_column(); }
+ int64_t total_compressed_bytes() const override {
+ int64_t total_compressed_bytes = 0;
+ for (size_t i = 0; i < column_writers_.size(); i++) {
+ if (column_writers_[i]) {
+ total_compressed_bytes += column_writers_[i]->total_compressed_bytes();
+ }
+ }
+ return total_compressed_bytes;
+ }
+
+ int64_t total_bytes_written() const override {
+ int64_t total_bytes_written = 0;
+ for (size_t i = 0; i < column_writers_.size(); i++) {
+ if (column_writers_[i]) {
+ total_bytes_written += column_writers_[i]->total_bytes_written();
+ }
+ }
+ return total_bytes_written;
+ }
+
void Close() override {
if (!closed_) {
closed_ = true;
+ CheckRowsWritten();
- if (current_column_writer_) {
- CheckRowsWritten();
- total_bytes_written_ += current_column_writer_->Close();
- current_column_writer_.reset();
+ for (size_t i = 0; i < column_writers_.size(); i++) {
+ if (column_writers_[i]) {
+ total_bytes_written_ += column_writers_[i]->Close();
+ column_writers_[i].reset();
+ }
}
+ column_writers_.clear();
+
// Ensures all columns have been written
+ metadata_->set_num_rows(num_rows_);
metadata_->Finish(total_bytes_written_);
}
}
@@ -125,21 +191,43 @@ class RowGroupSerializer : public
RowGroupWriter::Contents {
bool closed_;
int current_column_index_;
mutable int64_t num_rows_;
+ bool buffered_row_group_;
void CheckRowsWritten() const {
- int64_t current_rows = current_column_writer_->rows_written();
- if (num_rows_ < 0) {
- num_rows_ = current_rows;
- metadata_->set_num_rows(current_rows);
- } else if (num_rows_ != current_rows) {
- std::stringstream ss;
- ss << "Column " << current_column_index_ << " had " << current_rows
- << " while previous column had " << num_rows_;
- throw ParquetException(ss.str());
+ // verify when only one column is written at a time
+ if (!buffered_row_group_ && column_writers_.size() > 0 &&
column_writers_[0]) {
+ int64_t current_col_rows = column_writers_[0]->rows_written();
+ if (num_rows_ == 0) {
+ num_rows_ = current_col_rows;
+ } else if (num_rows_ != current_col_rows) {
+ ThrowRowsMisMatchError(current_column_index_, current_col_rows,
num_rows_);
+ }
+ } else if (buffered_row_group_ &&
+ column_writers_.size() > 0) { // when buffered_row_group = true
+ int64_t current_col_rows = column_writers_[0]->rows_written();
+ for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) {
+ int64_t current_col_rows_i = column_writers_[i]->rows_written();
+ if (current_col_rows != current_col_rows_i) {
+ ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows);
+ }
+ }
+ num_rows_ = current_col_rows;
+ }
+ }
+
+ void InitColumns() {
+ for (int i = 0; i < num_columns(); i++) {
+ auto col_meta = metadata_->NextColumnChunk();
+ const ColumnDescriptor* column_descr = col_meta->descr();
+ std::unique_ptr<PageWriter> pager =
+ PageWriter::Open(sink_,
properties_->compression(column_descr->path()),
+ col_meta, properties_->memory_pool(),
buffered_row_group_);
+ column_writers_.push_back(
+ ColumnWriter::Make(col_meta, std::move(pager), properties_));
}
}
- std::shared_ptr<ColumnWriter> current_column_writer_;
+ std::vector<std::shared_ptr<ColumnWriter>> column_writers_;
};
// ----------------------------------------------------------------------
@@ -187,18 +275,22 @@ class FileSerializer : public ParquetFileWriter::Contents
{
return properties_;
}
- RowGroupWriter* AppendRowGroup() override {
+ RowGroupWriter* AppendRowGroup(bool buffered_row_group) {
if (row_group_writer_) {
row_group_writer_->Close();
}
num_row_groups_++;
auto rg_metadata = metadata_->AppendRowGroup();
- std::unique_ptr<RowGroupWriter::Contents> contents(
- new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
+ std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
+ sink_.get(), rg_metadata, properties_.get(), buffered_row_group));
row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
return row_group_writer_.get();
}
+ RowGroupWriter* AppendRowGroup() override { return AppendRowGroup(false); }
+
+ RowGroupWriter* AppendBufferedRowGroup() override { return
AppendRowGroup(true); }
+
~FileSerializer() override {
try {
Close();
@@ -227,6 +319,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
int num_row_groups_;
int64_t num_rows_;
std::unique_ptr<FileMetaDataBuilder> metadata_;
+ // Only one of the row group writers is active at a time
std::unique_ptr<RowGroupWriter> row_group_writer_;
void StartFile() {
@@ -311,6 +404,10 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
return contents_->AppendRowGroup();
}
+RowGroupWriter* ParquetFileWriter::AppendBufferedRowGroup() {
+ return contents_->AppendBufferedRowGroup();
+}
+
RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
return AppendRowGroup();
}
diff --git a/src/parquet/file_writer.h b/src/parquet/file_writer.h
index de179824..cdfe06cd 100644
--- a/src/parquet/file_writer.h
+++ b/src/parquet/file_writer.h
@@ -49,15 +49,25 @@ class PARQUET_EXPORT RowGroupWriter {
virtual int num_columns() const = 0;
virtual int64_t num_rows() const = 0;
+ // to be used only with ParquetFileWriter::AppendRowGroup
virtual ColumnWriter* NextColumn() = 0;
+ // to be used only with ParquetFileWriter::AppendBufferedRowGroup
+ virtual ColumnWriter* column(int i) = 0;
+
virtual int current_column() const = 0;
virtual void Close() = 0;
+
+ // total bytes written by the page writer
+ virtual int64_t total_bytes_written() const = 0;
+ // total bytes still compressed but not written
+ virtual int64_t total_compressed_bytes() const = 0;
};
explicit RowGroupWriter(std::unique_ptr<Contents> contents);
/// Construct a ColumnWriter for the indicated row group-relative column.
///
+ /// To be used only with ParquetFileWriter::AppendRowGroup
/// Ownership is solely within the RowGroupWriter. The ColumnWriter is only
/// valid until the next call to NextColumn or Close. As the contents are
/// directly written to the sink, once a new column is started, the contents
@@ -69,11 +79,22 @@ class PARQUET_EXPORT RowGroupWriter {
int num_columns() const;
+ /// Construct a ColumnWriter for the indicated row group column.
+ ///
+ /// To be used only with ParquetFileWriter::AppendBufferedRowGroup
+ /// Ownership is solely within the RowGroupWriter. The ColumnWriter is
+ /// valid until Close. The contents are buffered in memory and written to
sink
+ /// on Close
+ ColumnWriter* column(int i);
+
/**
* Number of rows that shall be written as part of this RowGroup.
*/
int64_t num_rows() const;
+ int64_t total_bytes_written() const;
+ int64_t total_compressed_bytes() const;
+
private:
// Holds a pointer to an instance of Contents implementation
std::unique_ptr<Contents> contents_;
@@ -101,6 +122,7 @@ class PARQUET_EXPORT ParquetFileWriter {
RowGroupWriter* AppendRowGroup(int64_t num_rows);
virtual RowGroupWriter* AppendRowGroup() = 0;
+ virtual RowGroupWriter* AppendBufferedRowGroup() = 0;
virtual int64_t num_rows() const = 0;
virtual int num_columns() const = 0;
@@ -142,7 +164,7 @@ class PARQUET_EXPORT ParquetFileWriter {
// Construct a RowGroupWriter for the indicated number of rows.
//
// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is
only valid
- // until the next call to AppendRowGroup or Close.
+ // until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
// @param num_rows The number of rows that are stored in the new RowGroup
//
// \deprecated Since 1.3.0
@@ -151,9 +173,16 @@ class PARQUET_EXPORT ParquetFileWriter {
/// Construct a RowGroupWriter with an arbitrary number of rows.
///
/// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is
only valid
- /// until the next call to AppendRowGroup or Close.
+ /// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
RowGroupWriter* AppendRowGroup();
+ /// Construct a RowGroupWriter that buffers all the values until the
RowGroup is ready.
+ /// Use this if you want to write a RowGroup based on a certain size
+ ///
+ /// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is
only valid
+ /// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
+ RowGroupWriter* AppendBufferedRowGroup();
+
/// Number of columns.
///
/// This number is fixed during the lifetime of the writer as it is
determined via
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> [C++] Add an API to allow writing RowGroups based on their size rather than
> num_rows
> ------------------------------------------------------------------------------------
>
> Key: PARQUET-1372
> URL: https://issues.apache.org/jira/browse/PARQUET-1372
> Project: Parquet
> Issue Type: Task
> Reporter: Anatoli Shein
> Assignee: Deepak Majeti
> Priority: Major
> Labels: pull-request-available
> Fix For: cpp-1.5.0
>
>
> The current API allows writing RowGroups with specified numbers of rows,
> however does not allow writing RowGroups with specified size. In order to
> write RowGroups of specified size we need to write rows in chunks while
> checking the total_bytes_written after each chunk is written. This is
> currently impossible because the call to NextColumn() closes the current
> column writer.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)