This is an automated email from the ASF dual-hosted git repository.
lixueclaire pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-graphar.git
The following commit(s) were added to refs/heads/main by this push:
new 961c7b89 feat(c++): Add WriterOption to allow user to configure writer
option (#700)
961c7b89 is described below
commit 961c7b89df91b575ee52cba05451a4863f9ca4f1
Author: Xiaokang Yang <[email protected]>
AuthorDate: Fri Jun 27 19:15:20 2025 +0800
feat(c++): Add WriterOption to allow user to configure writer option (#700)
---
cli/CMakeLists.txt | 7 +-
cpp/examples/mid_level_writer_example.cc | 9 +-
cpp/src/graphar/arrow/chunk_writer.cc | 73 ++++-
cpp/src/graphar/arrow/chunk_writer.h | 33 +++
cpp/src/graphar/filesystem.cc | 27 +-
cpp/src/graphar/filesystem.h | 6 +-
cpp/src/graphar/high-level/edges_builder.cc | 3 +-
cpp/src/graphar/high-level/vertices_builder.h | 5 +-
cpp/src/graphar/writer_util.cc | 129 +++++++++
cpp/src/graphar/writer_util.h | 392 +++++++++++++++++++++++++-
cpp/test/test_arrow_chunk_writer.cc | 311 +++++++++++++++++---
cpp/test/test_builder.cc | 2 +-
docs/libraries/cpp/getting-started.md | 4 +-
13 files changed, 923 insertions(+), 78 deletions(-)
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 3639bbf4..21caa61f 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -37,7 +37,12 @@ find_package(Arrow REQUIRED)
find_package(ArrowDataset REQUIRED)
find_package(ArrowAcero REQUIRED)
find_package(Parquet REQUIRED)
-
+# Check if ORC is enabled.
+if (NOT ${ARROW_ORC})
+ message(WARNING "apache-arrow is built without ORC extension, ORC related
functionalities will be disabled.")
+else()
+ add_definitions(-DARROW_ORC) # Add macro, otherwise inconsistent in build
phase on ubuntu.
+endif()
# Add a library using FindPython's tooling (pybind11 also provides a helper
like
# this)
python_add_library(_core MODULE src/main.cc WITH_SOABI)
diff --git a/cpp/examples/mid_level_writer_example.cc
b/cpp/examples/mid_level_writer_example.cc
index 3b25d9fb..7ecfad8d 100644
--- a/cpp/examples/mid_level_writer_example.cc
+++ b/cpp/examples/mid_level_writer_example.cc
@@ -17,6 +17,7 @@
* under the License.
*/
+#include <arrow/util/type_fwd.h>
#include <iostream>
#include "arrow/api.h"
@@ -25,6 +26,8 @@
#include "./config.h"
#include "graphar/api/arrow_writer.h"
+#include "graphar/fwd.h"
+#include "graphar/writer_util.h"
arrow::Result<std::shared_ptr<arrow::Table>> generate_vertex_table() {
// property "id"
@@ -108,8 +111,10 @@ void vertex_property_writer(
auto vertex_meta = graphar::Yaml::LoadFile(vertex_meta_file).value();
auto vertex_info = graphar::VertexInfo::Load(vertex_meta).value();
ASSERT(vertex_info->GetType() == "person");
-
- auto maybe_writer = graphar::VertexPropertyWriter::Make(vertex_info,
"/tmp/");
+ auto builder = graphar::WriterOptions::ParquetOptionBuilder();
+ builder.compression(arrow::Compression::ZSTD);
+ auto maybe_writer = graphar::VertexPropertyWriter::Make(vertex_info, "/tmp/",
+ builder.build());
ASSERT(maybe_writer.status().ok());
auto writer = maybe_writer.value();
diff --git a/cpp/src/graphar/arrow/chunk_writer.cc
b/cpp/src/graphar/arrow/chunk_writer.cc
index 964886b0..234bbfb3 100644
--- a/cpp/src/graphar/arrow/chunk_writer.cc
+++ b/cpp/src/graphar/arrow/chunk_writer.cc
@@ -17,11 +17,13 @@
* under the License.
*/
+#include <cstddef>
#include <unordered_map>
#include <utility>
#include "arrow/api.h"
#include "arrow/compute/api.h"
+#include "graphar/writer_util.h"
#if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
#include "arrow/acero/exec_plan.h"
#else
@@ -101,10 +103,15 @@ Result<std::shared_ptr<arrow::Table>>
ExecutePlanAndCollectAsTable(
VertexPropertyWriter::VertexPropertyWriter(
const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
+ const std::shared_ptr<WriterOptions>& options,
const ValidateLevel& validate_level)
: vertex_info_(vertex_info),
prefix_(prefix),
- validate_level_(validate_level) {
+ validate_level_(validate_level),
+ options_(options) {
+ if (!options) {
+ options_ = WriterOptions::DefaultWriterOption();
+ }
if (validate_level_ == ValidateLevel::default_validate) {
throw std::runtime_error(
"default_validate is not allowed to be set as the global validate "
@@ -237,7 +244,7 @@ Status VertexPropertyWriter::WriteChunk(
GAR_ASSIGN_OR_RAISE(auto suffix,
vertex_info_->GetFilePath(property_group, chunk_index));
std::string path = prefix_ + suffix;
- return fs_->WriteTableToFile(in_table, file_type, path);
+ return fs_->WriteTableToFile(in_table, file_type, path, options_);
}
Status VertexPropertyWriter::WriteChunk(
@@ -395,11 +402,11 @@ Result<std::shared_ptr<arrow::Table>>
VertexPropertyWriter::GetLabelTable(
for (const auto& label : labels) {
arrow::BooleanBuilder builder;
for (const auto& row : bool_matrix) {
- builder.Append(row[label_to_index[label]]);
+ RETURN_NOT_ARROW_OK(builder.Append(row[label_to_index[label]]));
}
std::shared_ptr<arrow::Array> array;
- builder.Finish(&array);
+ RETURN_NOT_ARROW_OK(builder.Finish(&array));
fields.push_back(arrow::field(label, arrow::boolean()));
arrays.push_back(array);
}
@@ -413,19 +420,35 @@ Result<std::shared_ptr<arrow::Table>>
VertexPropertyWriter::GetLabelTable(
Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
+ const std::shared_ptr<WriterOptions>& options,
const ValidateLevel& validate_level) {
- return std::make_shared<VertexPropertyWriter>(vertex_info, prefix,
+ return std::make_shared<VertexPropertyWriter>(vertex_info, prefix, options,
validate_level);
}
Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
+ const std::shared_ptr<WriterOptions>& options,
const ValidateLevel& validate_level) {
auto vertex_info = graph_info->GetVertexInfo(type);
if (!vertex_info) {
return Status::KeyError("The vertex ", type, " doesn't exist.");
}
- return Make(vertex_info, graph_info->GetPrefix(), validate_level);
+ return Make(vertex_info, graph_info->GetPrefix(), options, validate_level);
+}
+
+Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
+ const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
+ const ValidateLevel& validate_level) {
+ return Make(vertex_info, prefix, WriterOptions::DefaultWriterOption(),
+ validate_level);
+}
+
+Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
+ const ValidateLevel& validate_level) {
+ return Make(graph_info, type, WriterOptions::DefaultWriterOption(),
+ validate_level);
}
Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::AddIndexColumn(
@@ -454,10 +477,15 @@ Result<std::shared_ptr<arrow::Table>>
VertexPropertyWriter::AddIndexColumn(
EdgeChunkWriter::EdgeChunkWriter(const std::shared_ptr<EdgeInfo>& edge_info,
const std::string& prefix,
AdjListType adj_list_type,
+ const std::shared_ptr<WriterOptions>& options,
const ValidateLevel& validate_level)
: edge_info_(edge_info),
adj_list_type_(adj_list_type),
- validate_level_(validate_level) {
+ validate_level_(validate_level),
+ options_(options) {
+ if (!options) {
+ options_ = WriterOptions::DefaultWriterOption();
+ }
if (validate_level_ == ValidateLevel::default_validate) {
throw std::runtime_error(
"default_validate is not allowed to be set as the global validate "
@@ -715,7 +743,7 @@ Status EdgeChunkWriter::WriteOffsetChunk(
GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetAdjListOffsetFilePath(
vertex_chunk_index, adj_list_type_));
std::string path = prefix_ + suffix;
- return fs_->WriteTableToFile(in_table, file_type, path);
+ return fs_->WriteTableToFile(in_table, file_type, path, options_);
}
Status EdgeChunkWriter::WriteAdjListChunk(
@@ -747,7 +775,7 @@ Status EdgeChunkWriter::WriteAdjListChunk(
auto suffix, edge_info_->GetAdjListFilePath(vertex_chunk_index,
chunk_index,
adj_list_type_));
std::string path = prefix_ + suffix;
- return fs_->WriteTableToFile(in_table, file_type, path);
+ return fs_->WriteTableToFile(in_table, file_type, path, options_);
}
Status EdgeChunkWriter::WritePropertyChunk(
@@ -777,7 +805,7 @@ Status EdgeChunkWriter::WritePropertyChunk(
property_group, adj_list_type_,
vertex_chunk_index, chunk_index));
std::string path = prefix_ + suffix;
- return fs_->WriteTableToFile(in_table, file_type, path);
+ return fs_->WriteTableToFile(in_table, file_type, path, options_);
}
Status EdgeChunkWriter::WritePropertyChunk(
@@ -1001,29 +1029,46 @@ Result<std::shared_ptr<arrow::Table>>
EdgeChunkWriter::sortTable(
Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
- AdjListType adj_list_type, const ValidateLevel& validate_level) {
+ AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
+ const ValidateLevel& validate_level) {
if (!edge_info->HasAdjacentListType(adj_list_type)) {
return Status::KeyError(
"The adjacent list type ", AdjListTypeToString(adj_list_type),
" doesn't exist in edge ", edge_info->GetEdgeType(), ".");
}
return std::make_shared<EdgeChunkWriter>(edge_info, prefix, adj_list_type,
- validate_level);
+ options, validate_level);
+}
+
+Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
+ const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
+ AdjListType adj_list_type, const ValidateLevel& validate_level) {
+ return Make(edge_info, prefix, adj_list_type,
+ WriterOptions::DefaultWriterOption(), validate_level);
}
Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
const std::string& edge_type, const std::string& dst_type,
- AdjListType adj_list_type, const ValidateLevel& validate_level) {
+ AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
+ const ValidateLevel& validate_level) {
auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
if (!edge_info) {
return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
dst_type, " doesn't exist.");
}
- return Make(edge_info, graph_info->GetPrefix(), adj_list_type,
+ return Make(edge_info, graph_info->GetPrefix(), adj_list_type, options,
validate_level);
}
+Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
+ const std::string& edge_type, const std::string& dst_type,
+ AdjListType adj_list_type, const ValidateLevel& validate_level) {
+ return Make(graph_info, src_type, edge_type, dst_type, adj_list_type,
+ WriterOptions::DefaultWriterOption(), validate_level);
+}
+
std::string EdgeChunkWriter::getSortColumnName(AdjListType adj_list_type) {
switch (adj_list_type) {
case AdjListType::unordered_by_source:
diff --git a/cpp/src/graphar/arrow/chunk_writer.h
b/cpp/src/graphar/arrow/chunk_writer.h
index 89d33126..76066f28 100644
--- a/cpp/src/graphar/arrow/chunk_writer.h
+++ b/cpp/src/graphar/arrow/chunk_writer.h
@@ -68,6 +68,8 @@ class VertexPropertyWriter {
*/
explicit VertexPropertyWriter(
const std::shared_ptr<VertexInfo>& vertex_info, const std::string&
prefix,
+ const std::shared_ptr<WriterOptions>& options =
+ WriterOptions::DefaultWriterOption(),
const ValidateLevel& validate_level = ValidateLevel::no_validate);
/**
@@ -214,7 +216,13 @@ class VertexPropertyWriter {
* @param prefix The absolute prefix.
* @param validate_level The global validate level for the writer, default is
* no_validate.
+ * @param options Options for writing the table, such as compression.
*/
+ static Result<std::shared_ptr<VertexPropertyWriter>> Make(
+ const std::shared_ptr<VertexInfo>& vertex_info, const std::string&
prefix,
+ const std::shared_ptr<WriterOptions>& options,
+ const ValidateLevel& validate_level = ValidateLevel::no_validate);
+
static Result<std::shared_ptr<VertexPropertyWriter>> Make(
const std::shared_ptr<VertexInfo>& vertex_info, const std::string&
prefix,
const ValidateLevel& validate_level = ValidateLevel::no_validate);
@@ -226,11 +234,21 @@ class VertexPropertyWriter {
* @param type The vertex type.
* @param validate_level The global validate level for the writer, default is
* no_validate.
+ * @param options Options for writing the table, such as compression.
*/
+ static Result<std::shared_ptr<VertexPropertyWriter>> Make(
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
+ const std::shared_ptr<WriterOptions>& options,
+ const ValidateLevel& validate_level = ValidateLevel::no_validate);
+
static Result<std::shared_ptr<VertexPropertyWriter>> Make(
const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
const ValidateLevel& validate_level = ValidateLevel::no_validate);
+ void setWriterOptions(const std::shared_ptr<WriterOptions>& options) {
+ options_ = options;
+ }
+
Result<std::shared_ptr<arrow::Table>> AddIndexColumn(
const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
IdType chunk_size) const;
@@ -274,6 +292,7 @@ class VertexPropertyWriter {
std::string prefix_;
std::shared_ptr<FileSystem> fs_;
ValidateLevel validate_level_;
+ std::shared_ptr<WriterOptions> options_;
};
/**
@@ -314,6 +333,8 @@ class EdgeChunkWriter {
explicit EdgeChunkWriter(
const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
AdjListType adj_list_type,
+ const std::shared_ptr<WriterOptions>& options =
+ WriterOptions::DefaultWriterOption(),
const ValidateLevel& validate_level = ValidateLevel::no_validate);
/**
@@ -584,6 +605,11 @@ class EdgeChunkWriter {
* @param validate_level The global validate level for the writer, default is
* no_validate.
*/
+ static Result<std::shared_ptr<EdgeChunkWriter>> Make(
+ const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
+ AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
+ const ValidateLevel& validate_level = ValidateLevel::no_validate);
+
static Result<std::shared_ptr<EdgeChunkWriter>> Make(
const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
AdjListType adj_list_type,
@@ -600,6 +626,12 @@ class EdgeChunkWriter {
* @param validate_level The global validate level for the writer, default is
* no_validate.
*/
+ static Result<std::shared_ptr<EdgeChunkWriter>> Make(
+ const std::shared_ptr<GraphInfo>& graph_info, const std::string&
src_type,
+ const std::string& edge_type, const std::string& dst_type,
+ AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
+ const ValidateLevel& validate_level = ValidateLevel::no_validate);
+
static Result<std::shared_ptr<EdgeChunkWriter>> Make(
const std::shared_ptr<GraphInfo>& graph_info, const std::string&
src_type,
const std::string& edge_type, const std::string& dst_type,
@@ -714,6 +746,7 @@ class EdgeChunkWriter {
std::string prefix_;
std::shared_ptr<FileSystem> fs_;
ValidateLevel validate_level_;
+ std::shared_ptr<WriterOptions> options_;
};
} // namespace graphar
diff --git a/cpp/src/graphar/filesystem.cc b/cpp/src/graphar/filesystem.cc
index e246eeb4..abc6d975 100644
--- a/cpp/src/graphar/filesystem.cc
+++ b/cpp/src/graphar/filesystem.cc
@@ -17,6 +17,8 @@
* under the License.
*/
+#include <memory>
+#include "graphar/writer_util.h"
#ifdef ARROW_ORC
#include "arrow/adapters/orc/adapter.h"
#endif
@@ -243,42 +245,37 @@ Status FileSystem::WriteValueToFile(const std::string&
value,
return Status::OK();
}
-Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
- FileType file_type,
- const std::string& path) const noexcept {
+Status FileSystem::WriteTableToFile(
+ const std::shared_ptr<arrow::Table>& table, FileType file_type,
+ const std::string& path,
+ const std::shared_ptr<WriterOptions>& options) const noexcept {
// try to create the directory, oss filesystem may not support this, ignore
ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
arrow_fs_->OpenOutputStream(path));
switch (file_type) {
case FileType::CSV: {
- auto write_options = arrow::csv::WriteOptions::Defaults();
- write_options.include_header = true;
- write_options.quoting_style = arrow::csv::QuotingStyle::Needed;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
- auto writer, arrow::csv::MakeCSVWriter(output_stream.get(),
- table->schema(),
write_options));
+ auto writer,
+ arrow::csv::MakeCSVWriter(output_stream.get(), table->schema(),
+ options->getCsvOption()));
RETURN_NOT_ARROW_OK(writer->WriteTable(*table));
RETURN_NOT_ARROW_OK(writer->Close());
break;
}
case FileType::PARQUET: {
auto schema = table->schema();
- auto column_num = schema->num_fields();
- parquet::WriterProperties::Builder builder;
- builder.compression(arrow::Compression::type::ZSTD); // enable compression
RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
*table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
- builder.build(), parquet::default_arrow_writer_properties()));
+ options->getParquetWriterProperties(),
+ options->getArrowWriterProperties()));
break;
}
#ifdef ARROW_ORC
case FileType::ORC: {
- auto writer_options = arrow::adapters::orc::WriteOptions();
- writer_options.compression = arrow::Compression::type::ZSTD;
GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
auto writer, arrow::adapters::orc::ORCFileWriter::Open(
- output_stream.get(), writer_options));
+ output_stream.get(), options->getOrcOption()));
RETURN_NOT_ARROW_OK(writer->Write(*table));
RETURN_NOT_ARROW_OK(writer->Close());
break;
diff --git a/cpp/src/graphar/filesystem.h b/cpp/src/graphar/filesystem.h
index 986b7d59..5c78c4bc 100644
--- a/cpp/src/graphar/filesystem.h
+++ b/cpp/src/graphar/filesystem.h
@@ -20,7 +20,6 @@
#pragma once
#include <memory>
-#include <optional>
#include <string>
#include <vector>
@@ -30,6 +29,7 @@
#include "graphar/util.h"
#include "graphar/reader_util.h"
+#include "graphar/writer_util.h"
// forward declarations
namespace arrow {
@@ -112,10 +112,12 @@ class FileSystem {
* @param input_table The table to write.
* @param file_type The type of the output file.
* @param path The path of the output file.
+ * @param options Options for writing the table, such as compression.
* @return A Status indicating OK if successful, or an error if unsuccessful.
*/
Status WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
- FileType file_type, const std::string& path) const
+ FileType file_type, const std::string& path,
+ const std::shared_ptr<WriterOptions>& options) const
noexcept;
/**
diff --git a/cpp/src/graphar/high-level/edges_builder.cc
b/cpp/src/graphar/high-level/edges_builder.cc
index 74352274..38141fb2 100644
--- a/cpp/src/graphar/high-level/edges_builder.cc
+++ b/cpp/src/graphar/high-level/edges_builder.cc
@@ -28,7 +28,8 @@ namespace graphar::builder {
Status EdgesBuilder::Dump() {
// construct the writer
- EdgeChunkWriter writer(edge_info_, prefix_, adj_list_type_, validate_level_);
+ EdgeChunkWriter writer(edge_info_, prefix_, adj_list_type_, nullptr,
+ validate_level_);
// construct empty edge collections for vertex chunks without edges
IdType num_vertex_chunks =
(num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
diff --git a/cpp/src/graphar/high-level/vertices_builder.h
b/cpp/src/graphar/high-level/vertices_builder.h
index 7ccf8e35..55b505d1 100644
--- a/cpp/src/graphar/high-level/vertices_builder.h
+++ b/cpp/src/graphar/high-level/vertices_builder.h
@@ -20,6 +20,7 @@
#pragma once
#include <any>
+#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
@@ -241,7 +242,9 @@ class VerticesBuilder {
*/
Status Dump() {
// construct the writer
- VertexPropertyWriter writer(vertex_info_, prefix_, validate_level_);
+ // TODO(yangxk) Allow users to use custom options instead of nullptr
+ VertexPropertyWriter writer(vertex_info_, prefix_, nullptr,
+ validate_level_);
IdType start_chunk_index =
start_vertex_index_ / vertex_info_->GetChunkSize();
// convert to table
diff --git a/cpp/src/graphar/writer_util.cc b/cpp/src/graphar/writer_util.cc
new file mode 100644
index 00000000..380e3df0
--- /dev/null
+++ b/cpp/src/graphar/writer_util.cc
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "graphar/writer_util.h"
+namespace graphar {
+arrow::csv::WriteOptions WriterOptions::getCsvOption() const {
+ if (csvOption_) {
+ arrow::csv::WriteOptions csvWriteOptions;
+ csvWriteOptions.include_header = csvOption_->include_header;
+ csvWriteOptions.batch_size = csvOption_->batch_size;
+ csvWriteOptions.delimiter = csvOption_->delimiter;
+ csvWriteOptions.null_string = csvOption_->null_string;
+ csvWriteOptions.io_context = csvOption_->io_context;
+ csvWriteOptions.eol = csvOption_->eol;
+ csvWriteOptions.quoting_style = csvOption_->quoting_style;
+ return csvWriteOptions;
+ } else {
+ return arrow::csv::WriteOptions::Defaults();
+ }
+}
+
+std::shared_ptr<parquet::WriterProperties>
+WriterOptions::getParquetWriterProperties() const {
+ parquet::WriterProperties::Builder builder;
+ if (parquetOption_) {
+ builder
+ .dictionary_pagesize_limit(parquetOption_->dictionary_pagesize_limit)
+ ->write_batch_size(parquetOption_->write_batch_size)
+ ->max_row_group_length(parquetOption_->max_row_group_length)
+ ->data_pagesize(parquetOption_->data_pagesize)
+ ->data_page_version(parquetOption_->data_page_version)
+ ->version(parquetOption_->version)
+ ->encoding(parquetOption_->encoding)
+ ->max_statistics_size(parquetOption_->max_statistics_size)
+ ->compression(parquetOption_->compression)
+ ->compression_level(parquetOption_->compression_level);
+ for (const auto& kv : parquetOption_->column_encoding) {
+ builder.encoding(kv.first, kv.second);
+ }
+ for (const auto& kv : parquetOption_->column_compression) {
+ builder.compression(kv.first, kv.second);
+ }
+ for (const auto& kv : parquetOption_->column_compression_level) {
+ builder.compression_level(kv.first, kv.second);
+ }
+ if (!parquetOption_->enable_dictionary) {
+ builder.disable_dictionary();
+ }
+ if (parquetOption_->encryption_properties) {
+ builder.encryption(parquetOption_->encryption_properties);
+ }
+ if (!parquetOption_->enable_statistics) {
+ builder.disable_statistics();
+ }
+ for (const auto& path_st : parquetOption_->column_statistics) {
+ if (!path_st.second) {
+ builder.disable_statistics(path_st.first);
+ }
+ }
+ if (!parquetOption_->sorting_columns.empty()) {
+ builder.set_sorting_columns(parquetOption_->sorting_columns);
+ }
+ if (parquetOption_->enable_store_decimal_as_integer) {
+ builder.enable_store_decimal_as_integer();
+ }
+ if (parquetOption_->enable_write_page_index) {
+ builder.enable_write_page_index();
+ }
+ }
+ return builder.build();
+}
+
+std::shared_ptr<parquet::ArrowWriterProperties>
+WriterOptions::getArrowWriterProperties() const {
+ parquet::ArrowWriterProperties::Builder builder;
+ if (parquetOption_) {
+ if (!parquetOption_->compliant_nested_types) {
+ builder.disable_compliant_nested_types();
+ }
+ builder.set_use_threads(parquetOption_->use_threads);
+ if (parquetOption_->enable_deprecated_int96_timestamps) {
+ builder.enable_deprecated_int96_timestamps();
+ }
+ builder.coerce_timestamps(parquetOption_->coerce_timestamps);
+ if (parquetOption_->allow_truncated_timestamps) {
+ builder.allow_truncated_timestamps();
+ }
+ if (parquetOption_->store_schema) {
+ builder.store_schema();
+ }
+ if (parquetOption_->executor) {
+ builder.set_executor(parquetOption_->executor);
+ }
+ }
+ return builder.build();
+}
+
+#ifdef ARROW_ORC
+arrow::adapters::orc::WriteOptions WriterOptions::getOrcOption() const {
+ auto writer_options = arrow::adapters::orc::WriteOptions();
+ writer_options.compression = arrow::Compression::ZSTD;
+ if (orcOption_) {
+ writer_options.batch_size = orcOption_->batch_size;
+ writer_options.compression = orcOption_->compression;
+ writer_options.stripe_size = orcOption_->stripe_size;
+ writer_options.file_version = orcOption_->file_version;
+ writer_options.bloom_filter_columns = orcOption_->bloom_filter_columns;
+ writer_options.bloom_filter_fpp = orcOption_->bloom_filter_fpp;
+ }
+ return writer_options;
+}
+#endif
+} // namespace graphar
diff --git a/cpp/src/graphar/writer_util.h b/cpp/src/graphar/writer_util.h
index 4f266ffa..e8e08e36 100644
--- a/cpp/src/graphar/writer_util.h
+++ b/cpp/src/graphar/writer_util.h
@@ -18,10 +18,398 @@
*/
#pragma once
-
-#include "graphar/macros.h"
+#include <parquet/properties.h>
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+#ifdef ARROW_ORC
+#include "arrow/adapters/orc/adapter.h"
+#endif
+#include "arrow/api.h"
+#include "arrow/csv/api.h"
+#include "arrow/dataset/api.h"
+#include "arrow/filesystem/api.h"
+#include "parquet/arrow/writer.h"
namespace graphar {
+/**
+ * @class WriterOptions
+ * @brief Provides configuration options for different file format writers
(CSV,
+ * Parquet, ORC) in GraphAr. A WriterOptions instance can simultaneously
contain
+ * options for all three formats (CSV, Parquet, ORC). The actual file format
+ * used for writing is determined by the FileType specified in the graph_Info.
+ *
+ * The configuration parameters and their default values are aligned with those
+ * in Arrow.
+ */
+class WriterOptions {
+ private:
+ /**
+ * @class CSVOption
+ * @brief Configuration options for CSV Writer.
+ * The complete list of supported parameters can be found in:
+ * https://arrow.apache.org/docs/cpp/api/formats.html#csv-writer
+ */
+ class CSVOption {
+ public:
+ arrow::io::IOContext io_context;
+ std::string null_string;
+ std::string eol = "\n";
+ bool include_header = true;
+ char delimiter = ',';
+ int32_t batch_size = 1024;
+ arrow::csv::QuotingStyle quoting_style = arrow::csv::QuotingStyle::Needed;
+ };
+ /**
+ * @class ParquetOption
+ * @brief Configuration options for Parquet Writer.
+ * This class includes parameters from both `WriterProperties` and
+ * `ArrowWriterProperties`. The complete list of supported parameters can be
+ * found in:
https://arrow.apache.org/docs/cpp/api/formats.html#parquet-writer
+ */
+ class ParquetOption {
+ public:
+ std::shared_ptr<::parquet::FileEncryptionProperties> encryption_properties;
+ std::unordered_map<std::string, ::parquet::Encoding::type> column_encoding;
+ std::unordered_map<std::string, arrow::Compression::type>
+ column_compression;
+ std::unordered_map<std::string, int> column_compression_level;
+ std::unordered_map<std::string, size_t> column_max_statistics_size;
+ std::unordered_map<std::string, bool> column_statistics;
+ std::unordered_map<std::string, bool> column_write_page_index;
+ std::vector<::parquet::SortingColumn> sorting_columns;
+ int64_t dictionary_pagesize_limit = 1024 * 1024;
+ int64_t write_batch_size = 1024;
+ int64_t max_row_group_length = 1024 * 1024;
+ int64_t data_pagesize = 1024 * 1024;
+ size_t max_statistics_size = 4096;
+ int compression_level = std::numeric_limits<int>::min();
+ ::parquet::ParquetDataPageVersion data_page_version =
+ ::parquet::ParquetDataPageVersion::V1;
+ ::parquet::ParquetVersion::type version =
+ ::parquet::ParquetVersion::PARQUET_2_6;
+ ::parquet::Encoding::type encoding = ::parquet::Encoding::PLAIN;
+ arrow::Compression::type compression = arrow::Compression::ZSTD;
+ ::arrow::TimeUnit::type coerce_timestamps = ::arrow::TimeUnit::MICRO;
+ ::arrow::internal::Executor* executor = nullptr;
+ bool enable_dictionary = true;
+ bool enable_statistics = true;
+ bool enable_store_decimal_as_integer = false;
+ bool enable_write_page_index = false;
+ bool compliant_nested_types = true;
+ bool use_threads = false;
+ bool enable_deprecated_int96_timestamps = false;
+ bool allow_truncated_timestamps = false;
+ bool store_schema = false;
+ };
+ /**
+ * @class ORCOption
+ * @brief Configuration options for ORC Writer.
+ * The complete list of supported parameters can be found in:
+ *
https://arrow.apache.org/docs/cpp/api/formats.html#_CPPv4N5arrow8adapters3orc12WriteOptionsE
+ */
+ class ORCOption {
+#ifdef ARROW_ORC
+ public:
+ std::vector<int64_t> bloom_filter_columns;
+ arrow::adapters::orc::FileVersion file_version =
+ arrow::adapters::orc::FileVersion(0, 12);
+ arrow::adapters::orc::CompressionStrategy compression_strategy =
+ arrow::adapters::orc::CompressionStrategy::kSpeed;
+ arrow::Compression::type compression = arrow::Compression::UNCOMPRESSED;
+ int64_t stripe_size = 64 * 1024 * 1024;
+ int64_t batch_size = 1024;
+ int64_t compression_block_size = 64 * 1024;
+ int64_t row_index_stride = 10000;
+ double padding_tolerance = 0.0;
+ double dictionary_key_size_threshold = 0.0;
+ double bloom_filter_fpp = 0.05;
+#endif
+ };
+
+ public:
+ // Builder for CSVOption
+ class CSVOptionBuilder {
+ public:
+ CSVOptionBuilder() : option_(std::make_shared<CSVOption>()) {}
+ explicit CSVOptionBuilder(std::shared_ptr<WriterOptions> wopt)
+ : writerOptions_(wopt),
+ option_(wopt ? wopt->csvOption_ : std::make_shared<CSVOption>()) {}
+ CSVOptionBuilder& include_header(bool header) {
+ option_->include_header = header;
+ return *this;
+ }
+ CSVOptionBuilder& batch_size(int32_t bs) {
+ option_->batch_size = bs;
+ return *this;
+ }
+ CSVOptionBuilder& delimiter(char d) {
+ option_->delimiter = d;
+ return *this;
+ }
+ CSVOptionBuilder& null_string(const std::string& ns) {
+ option_->null_string = ns;
+ return *this;
+ }
+ CSVOptionBuilder& io_context(const arrow::io::IOContext& ctx) {
+ option_->io_context = ctx;
+ return *this;
+ }
+ CSVOptionBuilder& eol(const std::string& e) {
+ option_->eol = e;
+ return *this;
+ }
+ CSVOptionBuilder& quoting_style(arrow::csv::QuotingStyle qs) {
+ option_->quoting_style = qs;
+ return *this;
+ }
+ std::shared_ptr<WriterOptions> build() {
+ return writerOptions_
+ ? writerOptions_
+ : std::make_shared<WriterOptions>(option_, nullptr, nullptr);
+ }
+
+ private:
+ std::shared_ptr<WriterOptions> writerOptions_;
+ std::shared_ptr<CSVOption> option_;
+ };
+
+ // Builder for ParquetOption
+ class ParquetOptionBuilder {
+ public:
+ ParquetOptionBuilder() : option_(std::make_shared<ParquetOption>()) {}
+ explicit ParquetOptionBuilder(std::shared_ptr<WriterOptions> wopt)
+ : writerOptions_(wopt),
+ option_(wopt ? wopt->parquetOption_
+ : std::make_shared<ParquetOption>()) {}
+ ParquetOptionBuilder& enable_dictionary(bool enable) {
+ option_->enable_dictionary = enable;
+ return *this;
+ }
+ ParquetOptionBuilder& dictionary_pagesize_limit(int64_t limit) {
+ option_->dictionary_pagesize_limit = limit;
+ return *this;
+ }
+ ParquetOptionBuilder& write_batch_size(int64_t batch_size) {
+ option_->write_batch_size = batch_size;
+ return *this;
+ }
+ ParquetOptionBuilder& max_row_group_length(int64_t length) {
+ option_->max_row_group_length = length;
+ return *this;
+ }
+ ParquetOptionBuilder& data_pagesize(int64_t pagesize) {
+ option_->data_pagesize = pagesize;
+ return *this;
+ }
+ ParquetOptionBuilder& data_page_version(
+ ::parquet::ParquetDataPageVersion version) {
+ option_->data_page_version = version;
+ return *this;
+ }
+ ParquetOptionBuilder& version(::parquet::ParquetVersion::type ver) {
+ option_->version = ver;
+ return *this;
+ }
+ ParquetOptionBuilder& encoding(::parquet::Encoding::type enc) {
+ option_->encoding = enc;
+ return *this;
+ }
+ ParquetOptionBuilder& column_encoding(
+ const std::unordered_map<std::string, ::parquet::Encoding::type>&
+ encodings) {
+ option_->column_encoding = encodings;
+ return *this;
+ }
+ ParquetOptionBuilder& compression(arrow::Compression::type comp) {
+ option_->compression = comp;
+ return *this;
+ }
+ ParquetOptionBuilder& column_compression(
+ const std::unordered_map<std::string, arrow::Compression::type>&
+ compressions) {
+ option_->column_compression = compressions;
+ return *this;
+ }
+ ParquetOptionBuilder& compression_level(int level) {
+ option_->compression_level = level;
+ return *this;
+ }
+ ParquetOptionBuilder& column_compression_level(
+ const std::unordered_map<std::string, int>& levels) {
+ option_->column_compression_level = levels;
+ return *this;
+ }
+ ParquetOptionBuilder& max_statistics_size(size_t size) {
+ option_->max_statistics_size = size;
+ return *this;
+ }
+ ParquetOptionBuilder& column_max_statistics_size(
+ const std::unordered_map<std::string, size_t>& sizes) {
+ option_->column_max_statistics_size = sizes;
+ return *this;
+ }
+ ParquetOptionBuilder& encryption_properties(
+ const std::shared_ptr<::parquet::FileEncryptionProperties>& props) {
+ option_->encryption_properties = props;
+ return *this;
+ }
+ ParquetOptionBuilder& enable_statistics(bool enable) {
+ option_->enable_statistics = enable;
+ return *this;
+ }
+ ParquetOptionBuilder& column_statistics(
+ const std::unordered_map<std::string, bool>& stats) {
+ option_->column_statistics = stats;
+ return *this;
+ }
+ ParquetOptionBuilder& sorting_columns(
+ const std::vector<::parquet::SortingColumn>& columns) {
+ option_->sorting_columns = columns;
+ return *this;
+ }
+ ParquetOptionBuilder& enable_store_decimal_as_integer(bool enable) {
+ option_->enable_store_decimal_as_integer = enable;
+ return *this;
+ }
+ ParquetOptionBuilder& enable_write_page_index(bool enable) {
+ option_->enable_write_page_index = enable;
+ return *this;
+ }
+ ParquetOptionBuilder& column_write_page_index(
+ const std::unordered_map<std::string, bool>& indices) {
+ option_->column_write_page_index = indices;
+ return *this;
+ }
+ ParquetOptionBuilder& compliant_nested_types(bool compliant) {
+ option_->compliant_nested_types = compliant;
+ return *this;
+ }
+ ParquetOptionBuilder& use_threads(bool use) {
+ option_->use_threads = use;
+ return *this;
+ }
+ ParquetOptionBuilder& enable_deprecated_int96_timestamps(bool enable) {
+ option_->enable_deprecated_int96_timestamps = enable;
+ return *this;
+ }
+ ParquetOptionBuilder& coerce_timestamps(::arrow::TimeUnit::type unit) {
+ option_->coerce_timestamps = unit;
+ return *this;
+ }
+ ParquetOptionBuilder& allow_truncated_timestamps(bool allow) {
+ option_->allow_truncated_timestamps = allow;
+ return *this;
+ }
+ ParquetOptionBuilder& store_schema(bool store) {
+ option_->store_schema = store;
+ return *this;
+ }
+ ParquetOptionBuilder& executor(::arrow::internal::Executor* exec) {
+ option_->executor = exec;
+ return *this;
+ }
+ std::shared_ptr<WriterOptions> build() {
+ return writerOptions_
+ ? writerOptions_
+ : std::make_shared<WriterOptions>(nullptr, option_, nullptr);
+ }
+
+ private:
+ std::shared_ptr<WriterOptions> writerOptions_;
+ std::shared_ptr<ParquetOption> option_;
+ };
+
+ // Builder for ORCOption
+ class ORCOptionBuilder {
+ public:
+ ORCOptionBuilder() : option_(std::make_shared<ORCOption>()) {}
+ explicit ORCOptionBuilder(std::shared_ptr<WriterOptions> wopt)
+ : writerOptions_(wopt),
+ option_(wopt ? wopt->orcOption_ : std::make_shared<ORCOption>()) {}
+#ifdef ARROW_ORC
+ ORCOptionBuilder& batch_size(int64_t bs) {
+ option_->batch_size = bs;
+ return *this;
+ }
+ ORCOptionBuilder& file_version(arrow::adapters::orc::FileVersion fv) {
+ option_->file_version = fv;
+ return *this;
+ }
+ ORCOptionBuilder& stripe_size(int64_t ss) {
+ option_->stripe_size = ss;
+ return *this;
+ }
+ ORCOptionBuilder& compression(arrow::Compression::type comp) {
+ option_->compression = comp;
+ return *this;
+ }
+ ORCOptionBuilder& compression_block_size(int64_t cbs) {
+ option_->compression_block_size = cbs;
+ return *this;
+ }
+ ORCOptionBuilder& compression_strategy(
+ arrow::adapters::orc::CompressionStrategy cs) {
+ option_->compression_strategy = cs;
+ return *this;
+ }
+ ORCOptionBuilder& row_index_stride(int64_t ris) {
+ option_->row_index_stride = ris;
+ return *this;
+ }
+ ORCOptionBuilder& padding_tolerance(double pt) {
+ option_->padding_tolerance = pt;
+ return *this;
+ }
+ ORCOptionBuilder& dictionary_key_size_threshold(double dkst) {
+ option_->dictionary_key_size_threshold = dkst;
+ return *this;
+ }
+ ORCOptionBuilder& bloom_filter_columns(const int64_t bfc) {
+ option_->bloom_filter_columns.push_back(bfc);
+ return *this;
+ }
+ ORCOptionBuilder& bloom_filter_fpp(double bffpp) {
+ option_->bloom_filter_fpp = bffpp;
+ return *this;
+ }
+#endif
+ std::shared_ptr<WriterOptions> build() {
+ return writerOptions_
+ ? writerOptions_
+ : std::make_shared<WriterOptions>(nullptr, nullptr, option_);
+ }
+
+ private:
+ std::shared_ptr<WriterOptions> writerOptions_;
+ std::shared_ptr<ORCOption> option_;
+ };
+
+ WriterOptions() = default;
+ WriterOptions(std::shared_ptr<CSVOption> csv,
+ std::shared_ptr<ParquetOption> parquet,
+ std::shared_ptr<ORCOption> orc)
+ : csvOption_(csv), parquetOption_(parquet), orcOption_(orc) {}
+ static std::shared_ptr<WriterOptions> DefaultWriterOption() {
+ return std::make_shared<WriterOptions>();
+ }
+ arrow::csv::WriteOptions getCsvOption() const;
+ std::shared_ptr<parquet::WriterProperties> getParquetWriterProperties()
const;
+ std::shared_ptr<parquet::ArrowWriterProperties> getArrowWriterProperties()
+ const;
+#ifdef ARROW_ORC
+ arrow::adapters::orc::WriteOptions getOrcOption() const;
+#endif
+
+ private:
+ std::shared_ptr<CSVOption> csvOption_;
+ std::shared_ptr<ParquetOption> parquetOption_;
+ std::shared_ptr<ORCOption> orcOption_;
+};
/**
* @brief The level for validating writing operations.
diff --git a/cpp/test/test_arrow_chunk_writer.cc
b/cpp/test/test_arrow_chunk_writer.cc
index cc35e25e..48814835 100644
--- a/cpp/test/test_arrow_chunk_writer.cc
+++ b/cpp/test/test_arrow_chunk_writer.cc
@@ -17,15 +17,19 @@
* under the License.
*/
+#include <parquet/types.h>
#include <fstream>
#include <iostream>
+#include <ostream>
#include <sstream>
#include <string>
+#include "arrow/api.h"
+#include "graphar/label.h"
+#include "graphar/writer_util.h"
#ifdef ARROW_ORC
#include "arrow/adapters/orc/adapter.h"
#endif
-#include "arrow/api.h"
#include "arrow/csv/api.h"
#include "arrow/filesystem/api.h"
#include "arrow/io/api.h"
@@ -67,11 +71,11 @@ TEST_CASE_METHOD(GlobalFixture, "TestVertexPropertyWriter")
{
std::cout << table->num_rows() << ' ' << table->num_columns() << std::endl;
// Construct the writer
- std::string vertex_meta_file =
+ std::string vertex_meta_file_parquet =
test_data_dir + "/ldbc_sample/parquet/" + "person.vertex.yml";
- auto vertex_meta = Yaml::LoadFile(vertex_meta_file).value();
- auto vertex_info = VertexInfo::Load(vertex_meta).value();
- auto maybe_writer = VertexPropertyWriter::Make(vertex_info, "/tmp/");
+ auto vertex_meta_parquet = Yaml::LoadFile(vertex_meta_file_parquet).value();
+ auto vertex_info_parquet = VertexInfo::Load(vertex_meta_parquet).value();
+ auto maybe_writer = VertexPropertyWriter::Make(vertex_info_parquet, "/tmp/");
REQUIRE(!maybe_writer.has_error());
auto writer = maybe_writer.value();
@@ -98,7 +102,7 @@ TEST_CASE_METHOD(GlobalFixture, "TestVertexPropertyWriter") {
// Out of range
REQUIRE(writer->WriteChunk(table, 0).IsInvalid());
// Invalid chunk id
- auto chunk = table->Slice(0, vertex_info->GetChunkSize());
+ auto chunk = table->Slice(0, vertex_info_parquet->GetChunkSize());
REQUIRE(writer->WriteChunk(chunk, -1).IsIndexError());
// Invalid property group
Property p1("invalid_property", int32(), false);
@@ -108,10 +112,10 @@ TEST_CASE_METHOD(GlobalFixture,
"TestVertexPropertyWriter") {
std::shared_ptr<arrow::Table> tmp_table =
table->RenameColumns({"original_id", "firstName", "lastName", "id"})
.ValueOrDie();
- auto pg2 = vertex_info->GetPropertyGroup("firstName");
+ auto pg2 = vertex_info_parquet->GetPropertyGroup("firstName");
REQUIRE(writer->WriteTable(tmp_table, pg2, 0).IsInvalid());
// Invalid data type
- auto pg3 = vertex_info->GetPropertyGroup("id");
+ auto pg3 = vertex_info_parquet->GetPropertyGroup("id");
REQUIRE(writer->WriteTable(tmp_table, pg3, 0).IsTypeError());
#ifdef ARROW_ORC
@@ -154,39 +158,155 @@ TEST_CASE_METHOD(GlobalFixture,
"TestVertexPropertyWriter") {
table2->GetColumnByName("gender")->ToString());
}
#endif
-
- SECTION("TestEdgeChunkWriter") {
- arrow::Status st;
+ SECTION("TestVertexPropertyWriterWithOption") {
+ // csv file
+ // Construct the writer
+ std::string vertex_meta_file_csv =
+ test_data_dir + "/ldbc_sample/csv/" + "person.vertex.yml";
+ auto vertex_meta_csv = Yaml::LoadFile(vertex_meta_file_csv).value();
+ auto vertex_info_csv = VertexInfo::Load(vertex_meta_csv).value();
+ auto csv_options = WriterOptions::CSVOptionBuilder();
+ csv_options.include_header(true);
+ csv_options.delimiter('|');
+ auto maybe_writer = VertexPropertyWriter::Make(
+ vertex_info_csv, "/tmp/option/", csv_options.build());
+ REQUIRE(!maybe_writer.has_error());
+ auto writer = maybe_writer.value();
+ REQUIRE(writer->WriteTable(table, 0).ok());
+ // read csv file
+ auto parse_options = arrow::csv::ParseOptions::Defaults();
+ parse_options.delimiter = '|';
+ std::shared_ptr<arrow::io::InputStream> chunk0_input =
+ fs->OpenInputStream(
+ "/tmp/option/vertex/person/firstName_lastName_gender/chunk0")
+ .ValueOrDie();
+ auto read_options = arrow::csv::ReadOptions::Defaults();
+ auto csv_reader =
+ arrow::csv::TableReader::Make(arrow::io::default_io_context(),
+ chunk0_input, read_options,
parse_options,
+ arrow::csv::ConvertOptions::Defaults())
+ .ValueOrDie();
+ auto maybe_table = csv_reader->Read();
+ REQUIRE(maybe_table.ok());
+ std::shared_ptr<arrow::Table> csv_table = *maybe_table;
+ REQUIRE(csv_table->num_rows() == vertex_info_csv->GetChunkSize());
+ REQUIRE(csv_table->num_columns() ==
+ static_cast<int>(vertex_info_csv->GetPropertyGroup("firstName")
+ ->GetProperties()
+ .size()) +
+ 1);
+ // type parquet
+ auto options_parquet_Builder = WriterOptions::ParquetOptionBuilder();
+
options_parquet_Builder.compression(arrow::Compression::type::UNCOMPRESSED);
+ options_parquet_Builder.enable_statistics(false);
+ parquet::SortingColumn sc;
+ sc.column_idx = 1;
+ std::vector<::parquet::SortingColumn> columns = {sc};
+ options_parquet_Builder.sorting_columns(columns)
+ .enable_store_decimal_as_integer(true)
+ .max_row_group_length(10);
+ maybe_writer = VertexPropertyWriter::Make(
+ vertex_info_parquet, "/tmp/option/", options_parquet_Builder.build());
+ REQUIRE(!maybe_writer.has_error());
+ writer = maybe_writer.value();
+ REQUIRE(writer->WriteTable(table, 0).ok());
+ // read parquet file
+ std::string parquet_file =
+ "/tmp/option/vertex/person/firstName_lastName_gender/chunk0";
+ auto parquet_fs =
+ arrow::fs::FileSystemFromUriOrPath(parquet_file).ValueOrDie();
+ std::shared_ptr<arrow::io::RandomAccessFile> parquet_input =
+ parquet_fs->OpenInputFile(parquet_file).ValueOrDie();
+ std::unique_ptr<parquet::arrow::FileReader> parquet_reader;
+ auto st = parquet::arrow::OpenFile(
+ parquet_input, arrow::default_memory_pool(), &parquet_reader);
+ REQUIRE(st.ok());
+ std::shared_ptr<arrow::Table> parquet_table;
+ st = parquet_reader->ReadTable(&parquet_table);
+ REQUIRE(st.ok());
+ auto parquet_metadata = parquet_reader->parquet_reader()->metadata();
+ auto row_group_meta = parquet_metadata->RowGroup(0);
+ auto col_meta = row_group_meta->ColumnChunk(0);
+ REQUIRE(row_group_meta->sorting_columns().size() == 1);
+ REQUIRE(row_group_meta->sorting_columns()[0].column_idx == 1);
+ REQUIRE(col_meta->compression() == parquet::Compression::UNCOMPRESSED);
+ REQUIRE(!col_meta->statistics());
+ REQUIRE(parquet_table->num_rows() == vertex_info_parquet->GetChunkSize());
+ REQUIRE(parquet_metadata->num_row_groups() ==
+ parquet_table->num_rows() / 10);
+ REQUIRE(parquet_table->num_columns() ==
+ static_cast<int>(vertex_info_parquet->GetPropertyGroup("firstName")
+ ->GetProperties()
+ .size() +
+ 1));
+#ifdef ARROW_ORC
+ std::string vertex_meta_file_orc =
+ test_data_dir + "/ldbc_sample/orc/" + "person.vertex.yml";
+ auto vertex_meta_orc = Yaml::LoadFile(vertex_meta_file_orc).value();
+ auto vertex_info_orc = VertexInfo::Load(vertex_meta_orc).value();
+ auto optionsOrcBuilder = WriterOptions::ORCOptionBuilder();
+ optionsOrcBuilder.compression(arrow::Compression::type::ZSTD);
+ maybe_writer = VertexPropertyWriter::Make(vertex_info_orc, "/tmp/option/",
+ optionsOrcBuilder.build());
+ REQUIRE(!maybe_writer.has_error());
+ writer = maybe_writer.value();
+ REQUIRE(writer->WriteTable(table, 0).ok());
+ auto fs1 = arrow::fs::FileSystemFromUriOrPath(
+
"/tmp/option/vertex/person/firstName_lastName_gender/chunk0")
+ .ValueOrDie();
+ std::shared_ptr<arrow::io::RandomAccessFile> input1 =
+ fs1->OpenInputFile(
+ "/tmp/option/vertex/person/firstName_lastName_gender/chunk0")
+ .ValueOrDie();
arrow::MemoryPool* pool = arrow::default_memory_pool();
- std::string path = test_data_dir +
- "/ldbc_sample/parquet/edge/person_knows_person/"
- "unordered_by_source/adj_list/part0/chunk0";
- auto fs = arrow::fs::FileSystemFromUriOrPath(path).ValueOrDie();
- std::shared_ptr<arrow::io::RandomAccessFile> input =
- fs->OpenInputFile(path).ValueOrDie();
- std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
- st = parquet::arrow::OpenFile(input, pool, &arrow_reader);
+ std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader =
+ arrow::adapters::orc::ORCFileReader::Open(input1, pool).ValueOrDie();
// Read entire file as a single Arrow table
- std::shared_ptr<arrow::Table> maybe_table;
- st = arrow_reader->ReadTable(&maybe_table);
- REQUIRE(st.ok());
+ maybe_table = reader->Read();
+ std::shared_ptr<arrow::Table> table1 = maybe_table.ValueOrDie();
+ REQUIRE(reader->GetCompression() == parquet::Compression::ZSTD);
+ REQUIRE(table1->num_rows() == vertex_info_parquet->GetChunkSize());
+ REQUIRE(table1->num_columns() ==
+ static_cast<int>(vertex_info_parquet->GetPropertyGroup("firstName")
+ ->GetProperties()
+ .size()) +
+ 1);
+#endif
+ }
+}
+TEST_CASE_METHOD(GlobalFixture, "TestEdgeChunkWriter") {
+ arrow::Status st;
+ arrow::MemoryPool* pool = arrow::default_memory_pool();
+ std::string path = test_data_dir +
+ "/ldbc_sample/parquet/edge/person_knows_person/"
+ "unordered_by_source/adj_list/part0/chunk0";
+ auto fs = arrow::fs::FileSystemFromUriOrPath(path).ValueOrDie();
+ std::shared_ptr<arrow::io::RandomAccessFile> input =
+ fs->OpenInputFile(path).ValueOrDie();
+ std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
+ st = parquet::arrow::OpenFile(input, pool, &arrow_reader);
+ // Read entire file as a single Arrow table
+ std::shared_ptr<arrow::Table> maybe_table;
+ st = arrow_reader->ReadTable(&maybe_table);
+ REQUIRE(st.ok());
- std::shared_ptr<arrow::Table> table =
- maybe_table
- ->RenameColumns(
- {GeneralParams::kSrcIndexCol, GeneralParams::kDstIndexCol})
- .ValueOrDie();
- std::cout << table->schema()->ToString() << std::endl;
- std::cout << table->num_rows() << ' ' << table->num_columns() << std::endl;
+ std::shared_ptr<arrow::Table> table =
+ maybe_table
+ ->RenameColumns(
+ {GeneralParams::kSrcIndexCol, GeneralParams::kDstIndexCol})
+ .ValueOrDie();
+ std::cout << table->schema()->ToString() << std::endl;
+ std::cout << table->num_rows() << ' ' << table->num_columns() << std::endl;
+ // Construct the writer
+ std::string edge_meta_file_csv =
+ test_data_dir + "/ldbc_sample/csv/" + "person_knows_person.edge.yml";
+ auto edge_meta_csv = Yaml::LoadFile(edge_meta_file_csv).value();
+ auto edge_info_csv = EdgeInfo::Load(edge_meta_csv).value();
+ auto adj_list_type = AdjListType::ordered_by_source;
- // Construct the writer
- std::string edge_meta_file =
- test_data_dir + "/ldbc_sample/csv/" + "person_knows_person.edge.yml";
- auto edge_meta = Yaml::LoadFile(edge_meta_file).value();
- auto edge_info = EdgeInfo::Load(edge_meta).value();
- auto adj_list_type = AdjListType::ordered_by_source;
+ SECTION("TestEdgeChunkWriterWithoutOption") {
auto maybe_writer =
- EdgeChunkWriter::Make(edge_info, "/tmp/", adj_list_type);
+ EdgeChunkWriter::Make(edge_info_csv, "/tmp/", adj_list_type);
REQUIRE(!maybe_writer.has_error());
auto writer = maybe_writer.value();
@@ -236,14 +356,14 @@ TEST_CASE_METHOD(GlobalFixture,
"TestVertexPropertyWriter") {
// Invalid adj list type
auto invalid_adj_list_type = AdjListType::unordered_by_dest;
auto maybe_writer2 =
- EdgeChunkWriter::Make(edge_info, "/tmp/", invalid_adj_list_type);
+ EdgeChunkWriter::Make(edge_info_csv, "/tmp/", invalid_adj_list_type);
REQUIRE(maybe_writer2.has_error());
// Invalid property group
Property p1("invalid_property", int32(), false);
auto pg1 = CreatePropertyGroup({p1}, FileType::CSV);
REQUIRE(writer->WritePropertyChunk(table, pg1, 0, 0).IsKeyError());
// Property not found in table
- auto pg2 = edge_info->GetPropertyGroup("creationDate");
+ auto pg2 = edge_info_csv->GetPropertyGroup("creationDate");
REQUIRE(writer->WritePropertyChunk(table, pg2, 0, 0).IsInvalid());
// Required columns not found
std::shared_ptr<arrow::Table> tmp_table =
@@ -252,5 +372,120 @@ TEST_CASE_METHOD(GlobalFixture,
"TestVertexPropertyWriter") {
// Invalid data type
REQUIRE(writer->WritePropertyChunk(tmp_table, pg2, 0, 0).IsTypeError());
}
+ SECTION("TestEdgeChunkWriterWithOption") {
+ WriterOptions::CSVOptionBuilder csv_options_builder;
+ csv_options_builder.include_header(true).delimiter('|');
+ auto maybe_writer =
+ EdgeChunkWriter::Make(edge_info_csv, "/tmp/option/", adj_list_type,
+ csv_options_builder.build());
+ REQUIRE(!maybe_writer.has_error());
+ auto writer = maybe_writer.value();
+
+ // Valid: Write adj list with options
+ REQUIRE(writer->SortAndWriteAdjListTable(table, 0, 0).ok());
+ // Valid: Write edge count
+ REQUIRE(writer->WriteEdgesNum(0, table->num_rows()).ok());
+ // Valid: Write vertex count
+ REQUIRE(writer->WriteVerticesNum(903).ok());
+
+ // Read back CSV file and check delimiter/header
+ auto parse_options = arrow::csv::ParseOptions::Defaults();
+ parse_options.delimiter = '|';
+ auto read_options = arrow::csv::ReadOptions::Defaults();
+ std::shared_ptr<arrow::io::InputStream> chunk0_input =
+ fs->OpenInputStream(
+
"/tmp/option/edge/person_knows_person/ordered_by_source/adj_list/"
+ "part0/chunk0")
+ .ValueOrDie();
+ auto csv_reader =
+ arrow::csv::TableReader::Make(arrow::io::default_io_context(),
+ chunk0_input, read_options,
parse_options,
+ arrow::csv::ConvertOptions::Defaults())
+ .ValueOrDie();
+ auto maybe_table2 = csv_reader->Read();
+ REQUIRE(maybe_table2.ok());
+ std::shared_ptr<arrow::Table> csv_table = *maybe_table2;
+ REQUIRE(csv_table->num_rows() ==
+ std::min(edge_info_csv->GetChunkSize(), table->num_rows()));
+ REQUIRE(csv_table->num_columns() == table->num_columns());
+
+ // Parquet option
+ std::string edge_meta_file_parquet = test_data_dir +
+ "/ldbc_sample/parquet/" +
+ "person_knows_person.edge.yml";
+ auto edge_meta_parquet = Yaml::LoadFile(edge_meta_file_parquet).value();
+ auto edge_info_parquet = EdgeInfo::Load(edge_meta_parquet).value();
+ auto optionsBuilderParquet = WriterOptions::ParquetOptionBuilder();
+ optionsBuilderParquet.compression(arrow::Compression::type::UNCOMPRESSED);
+ optionsBuilderParquet.enable_statistics(false);
+ optionsBuilderParquet.enable_store_decimal_as_integer(true);
+ optionsBuilderParquet.max_row_group_length(10);
+ auto maybe_parquet_writer =
+ EdgeChunkWriter::Make(edge_info_parquet, "/tmp/option/", adj_list_type,
+ optionsBuilderParquet.build());
+ REQUIRE(!maybe_parquet_writer.has_error());
+ auto parquet_writer = maybe_parquet_writer.value();
+ REQUIRE(parquet_writer->SortAndWriteAdjListTable(table, 0, 0).ok());
+ std::string parquet_file =
+
"/tmp/option/edge/person_knows_person/ordered_by_source/adj_list/part0/"
+ "chunk0";
+ auto parquet_fs =
+ arrow::fs::FileSystemFromUriOrPath(parquet_file).ValueOrDie();
+ std::shared_ptr<arrow::io::RandomAccessFile> parquet_input =
+ parquet_fs->OpenInputFile(parquet_file).ValueOrDie();
+ std::unique_ptr<parquet::arrow::FileReader> parquet_reader;
+ auto st = parquet::arrow::OpenFile(
+ parquet_input, arrow::default_memory_pool(), &parquet_reader);
+ REQUIRE(st.ok());
+ std::shared_ptr<arrow::Table> parquet_table;
+ st = parquet_reader->ReadTable(&parquet_table);
+ REQUIRE(st.ok());
+ auto parquet_metadata = parquet_reader->parquet_reader()->metadata();
+ auto row_group_meta = parquet_metadata->RowGroup(0);
+ auto col_meta = row_group_meta->ColumnChunk(0);
+ REQUIRE(col_meta->compression() == parquet::Compression::UNCOMPRESSED);
+ REQUIRE(!col_meta->statistics());
+ REQUIRE(parquet_table->num_rows() ==
+ std::min(table->num_rows(), edge_info_parquet->GetChunkSize()));
+ REQUIRE(parquet_metadata->num_row_groups() ==
+ parquet_table->num_rows() / 10 + 1);
+ REQUIRE(parquet_table->num_columns() ==
+ static_cast<int>(table->num_columns()));
+
+#ifdef ARROW_ORC
+ // ORC option
+ std::string edge_meta_file_orc =
+ test_data_dir + "/ldbc_sample/orc/" + "person_knows_person.edge.yml";
+ auto edge_meta_orc = Yaml::LoadFile(edge_meta_file_orc).value();
+ auto edge_info_orc = EdgeInfo::Load(edge_meta_orc).value();
+ auto optionsBuilderOrc = WriterOptions::ORCOptionBuilder();
+ optionsBuilderOrc.compression(arrow::Compression::type::ZSTD);
+ auto maybe_orc_writer =
+ EdgeChunkWriter::Make(edge_info_orc, "/tmp/option/", adj_list_type,
+ optionsBuilderOrc.build());
+ REQUIRE(!maybe_orc_writer.has_error());
+ auto orc_writer = maybe_orc_writer.value();
+ REQUIRE(orc_writer->SortAndWriteAdjListTable(table, 0, 0).ok());
+ auto orc_fs = arrow::fs::FileSystemFromUriOrPath(
+ "/tmp/option/edge/person_knows_person/ordered_by_source/"
+ "adj_list/part0/chunk0")
+ .ValueOrDie();
+ std::shared_ptr<arrow::io::RandomAccessFile> orc_input =
+ orc_fs
+ ->OpenInputFile(
+ "/tmp/option/edge/person_knows_person/ordered_by_source/"
+ "adj_list/part0/chunk0")
+ .ValueOrDie();
+ arrow::MemoryPool* pool = arrow::default_memory_pool();
+ std::unique_ptr<arrow::adapters::orc::ORCFileReader> orc_reader =
+ arrow::adapters::orc::ORCFileReader::Open(orc_input,
pool).ValueOrDie();
+ auto maybe_orc_table = orc_reader->Read();
+ REQUIRE(maybe_orc_table.ok());
+ std::shared_ptr<arrow::Table> orc_table = *maybe_orc_table;
+ REQUIRE(orc_reader->GetCompression() == parquet::Compression::ZSTD);
+ REQUIRE(orc_table->num_rows() == table->num_rows());
+ REQUIRE(orc_table->num_columns() == table->num_columns());
+#endif
+ }
}
} // namespace graphar
diff --git a/cpp/test/test_builder.cc b/cpp/test/test_builder.cc
index 63ebebc8..555a5669 100644
--- a/cpp/test/test_builder.cc
+++ b/cpp/test/test_builder.cc
@@ -37,7 +37,7 @@
#include <catch2/catch_test_macros.hpp>
namespace graphar {
-TEST_CASE_METHOD(GlobalFixture, "test_vertices_builder") {
+TEST_CASE_METHOD(GlobalFixture, "Test_vertices_builder") {
std::cout << "Test vertex builder" << std::endl;
// construct vertex builder
diff --git a/docs/libraries/cpp/getting-started.md
b/docs/libraries/cpp/getting-started.md
index a9b9fa03..c0403a77 100644
--- a/docs/libraries/cpp/getting-started.md
+++ b/docs/libraries/cpp/getting-started.md
@@ -341,9 +341,11 @@ When you have the data ready, you can read the file into
`arrow::Table` by using
```
You can export label table to disk in parquet format, and read it back into
memory in the following way.
``` cpp
+ auto options_parquet_builder = WriterOptions::ParquetOptionBuilder();
+ options_parquet_builder.compression(arrow::Compression::type::UNCOMPRESSED);
// write arrow table as parquet chunk
auto maybe_writer =
- VertexPropertyWriter::Make(vertex_info, test_data_dir +
"/ldbc/parquet/");
+ VertexPropertyWriter::Make(vertex_info, test_data_dir +
"/ldbc/parquet/", options_parquet_builder.build());
REQUIRE(!maybe_writer.has_error());
auto writer = maybe_writer.value();
REQUIRE(writer->WriteTable(table, 0).ok());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]