This is an automated email from the ASF dual-hosted git repository.

lixueclaire pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-graphar.git


The following commit(s) were added to refs/heads/main by this push:
     new 961c7b89 feat(c++): Add WriterOption to allow user to configure writer 
option (#700)
961c7b89 is described below

commit 961c7b89df91b575ee52cba05451a4863f9ca4f1
Author: Xiaokang Yang <[email protected]>
AuthorDate: Fri Jun 27 19:15:20 2025 +0800

    feat(c++): Add WriterOption to allow user to configure writer option (#700)
---
 cli/CMakeLists.txt                            |   7 +-
 cpp/examples/mid_level_writer_example.cc      |   9 +-
 cpp/src/graphar/arrow/chunk_writer.cc         |  73 ++++-
 cpp/src/graphar/arrow/chunk_writer.h          |  33 +++
 cpp/src/graphar/filesystem.cc                 |  27 +-
 cpp/src/graphar/filesystem.h                  |   6 +-
 cpp/src/graphar/high-level/edges_builder.cc   |   3 +-
 cpp/src/graphar/high-level/vertices_builder.h |   5 +-
 cpp/src/graphar/writer_util.cc                | 129 +++++++++
 cpp/src/graphar/writer_util.h                 | 392 +++++++++++++++++++++++++-
 cpp/test/test_arrow_chunk_writer.cc           | 311 +++++++++++++++++---
 cpp/test/test_builder.cc                      |   2 +-
 docs/libraries/cpp/getting-started.md         |   4 +-
 13 files changed, 923 insertions(+), 78 deletions(-)

diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 3639bbf4..21caa61f 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -37,7 +37,12 @@ find_package(Arrow REQUIRED)
 find_package(ArrowDataset REQUIRED)
 find_package(ArrowAcero REQUIRED)
 find_package(Parquet REQUIRED)
-
+# Check if ORC is enabled.
+if (NOT ${ARROW_ORC})
+    message(WARNING "apache-arrow is built without ORC extension, ORC related 
functionalities will be disabled.")
+else()
+    add_definitions(-DARROW_ORC) # Add macro, otherwise inconsistent in build 
phase on ubuntu.
+endif()
 # Add a library using FindPython's tooling (pybind11 also provides a helper 
like
 # this)
 python_add_library(_core MODULE src/main.cc WITH_SOABI)
diff --git a/cpp/examples/mid_level_writer_example.cc 
b/cpp/examples/mid_level_writer_example.cc
index 3b25d9fb..7ecfad8d 100644
--- a/cpp/examples/mid_level_writer_example.cc
+++ b/cpp/examples/mid_level_writer_example.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include <arrow/util/type_fwd.h>
 #include <iostream>
 
 #include "arrow/api.h"
@@ -25,6 +26,8 @@
 
 #include "./config.h"
 #include "graphar/api/arrow_writer.h"
+#include "graphar/fwd.h"
+#include "graphar/writer_util.h"
 
 arrow::Result<std::shared_ptr<arrow::Table>> generate_vertex_table() {
   // property "id"
@@ -108,8 +111,10 @@ void vertex_property_writer(
   auto vertex_meta = graphar::Yaml::LoadFile(vertex_meta_file).value();
   auto vertex_info = graphar::VertexInfo::Load(vertex_meta).value();
   ASSERT(vertex_info->GetType() == "person");
-
-  auto maybe_writer = graphar::VertexPropertyWriter::Make(vertex_info, 
"/tmp/");
+  auto builder = graphar::WriterOptions::ParquetOptionBuilder();
+  builder.compression(arrow::Compression::ZSTD);
+  auto maybe_writer = graphar::VertexPropertyWriter::Make(vertex_info, "/tmp/",
+                                                          builder.build());
   ASSERT(maybe_writer.status().ok());
   auto writer = maybe_writer.value();
 
diff --git a/cpp/src/graphar/arrow/chunk_writer.cc 
b/cpp/src/graphar/arrow/chunk_writer.cc
index 964886b0..234bbfb3 100644
--- a/cpp/src/graphar/arrow/chunk_writer.cc
+++ b/cpp/src/graphar/arrow/chunk_writer.cc
@@ -17,11 +17,13 @@
  * under the License.
  */
 
+#include <cstddef>
 #include <unordered_map>
 #include <utility>
 
 #include "arrow/api.h"
 #include "arrow/compute/api.h"
+#include "graphar/writer_util.h"
 #if defined(ARROW_VERSION) && ARROW_VERSION >= 12000000
 #include "arrow/acero/exec_plan.h"
 #else
@@ -101,10 +103,15 @@ Result<std::shared_ptr<arrow::Table>> 
ExecutePlanAndCollectAsTable(
 
 VertexPropertyWriter::VertexPropertyWriter(
     const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
+    const std::shared_ptr<WriterOptions>& options,
     const ValidateLevel& validate_level)
     : vertex_info_(vertex_info),
       prefix_(prefix),
-      validate_level_(validate_level) {
+      validate_level_(validate_level),
+      options_(options) {
+  if (!options) {
+    options_ = WriterOptions::DefaultWriterOption();
+  }
   if (validate_level_ == ValidateLevel::default_validate) {
     throw std::runtime_error(
         "default_validate is not allowed to be set as the global validate "
@@ -237,7 +244,7 @@ Status VertexPropertyWriter::WriteChunk(
   GAR_ASSIGN_OR_RAISE(auto suffix,
                       vertex_info_->GetFilePath(property_group, chunk_index));
   std::string path = prefix_ + suffix;
-  return fs_->WriteTableToFile(in_table, file_type, path);
+  return fs_->WriteTableToFile(in_table, file_type, path, options_);
 }
 
 Status VertexPropertyWriter::WriteChunk(
@@ -395,11 +402,11 @@ Result<std::shared_ptr<arrow::Table>> 
VertexPropertyWriter::GetLabelTable(
   for (const auto& label : labels) {
     arrow::BooleanBuilder builder;
     for (const auto& row : bool_matrix) {
-      builder.Append(row[label_to_index[label]]);
+      RETURN_NOT_ARROW_OK(builder.Append(row[label_to_index[label]]));
     }
 
     std::shared_ptr<arrow::Array> array;
-    builder.Finish(&array);
+    RETURN_NOT_ARROW_OK(builder.Finish(&array));
     fields.push_back(arrow::field(label, arrow::boolean()));
     arrays.push_back(array);
   }
@@ -413,19 +420,35 @@ Result<std::shared_ptr<arrow::Table>> 
VertexPropertyWriter::GetLabelTable(
 
 Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
     const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
+    const std::shared_ptr<WriterOptions>& options,
     const ValidateLevel& validate_level) {
-  return std::make_shared<VertexPropertyWriter>(vertex_info, prefix,
+  return std::make_shared<VertexPropertyWriter>(vertex_info, prefix, options,
                                                 validate_level);
 }
 
 Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
     const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
+    const std::shared_ptr<WriterOptions>& options,
     const ValidateLevel& validate_level) {
   auto vertex_info = graph_info->GetVertexInfo(type);
   if (!vertex_info) {
     return Status::KeyError("The vertex ", type, " doesn't exist.");
   }
-  return Make(vertex_info, graph_info->GetPrefix(), validate_level);
+  return Make(vertex_info, graph_info->GetPrefix(), options, validate_level);
+}
+
+Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
+    const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix,
+    const ValidateLevel& validate_level) {
+  return Make(vertex_info, prefix, WriterOptions::DefaultWriterOption(),
+              validate_level);
+}
+
+Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make(
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
+    const ValidateLevel& validate_level) {
+  return Make(graph_info, type, WriterOptions::DefaultWriterOption(),
+              validate_level);
 }
 
 Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::AddIndexColumn(
@@ -454,10 +477,15 @@ Result<std::shared_ptr<arrow::Table>> 
VertexPropertyWriter::AddIndexColumn(
 EdgeChunkWriter::EdgeChunkWriter(const std::shared_ptr<EdgeInfo>& edge_info,
                                  const std::string& prefix,
                                  AdjListType adj_list_type,
+                                 const std::shared_ptr<WriterOptions>& options,
                                  const ValidateLevel& validate_level)
     : edge_info_(edge_info),
       adj_list_type_(adj_list_type),
-      validate_level_(validate_level) {
+      validate_level_(validate_level),
+      options_(options) {
+  if (!options) {
+    options_ = WriterOptions::DefaultWriterOption();
+  }
   if (validate_level_ == ValidateLevel::default_validate) {
     throw std::runtime_error(
         "default_validate is not allowed to be set as the global validate "
@@ -715,7 +743,7 @@ Status EdgeChunkWriter::WriteOffsetChunk(
   GAR_ASSIGN_OR_RAISE(auto suffix, edge_info_->GetAdjListOffsetFilePath(
                                        vertex_chunk_index, adj_list_type_));
   std::string path = prefix_ + suffix;
-  return fs_->WriteTableToFile(in_table, file_type, path);
+  return fs_->WriteTableToFile(in_table, file_type, path, options_);
 }
 
 Status EdgeChunkWriter::WriteAdjListChunk(
@@ -747,7 +775,7 @@ Status EdgeChunkWriter::WriteAdjListChunk(
       auto suffix, edge_info_->GetAdjListFilePath(vertex_chunk_index,
                                                   chunk_index, 
adj_list_type_));
   std::string path = prefix_ + suffix;
-  return fs_->WriteTableToFile(in_table, file_type, path);
+  return fs_->WriteTableToFile(in_table, file_type, path, options_);
 }
 
 Status EdgeChunkWriter::WritePropertyChunk(
@@ -777,7 +805,7 @@ Status EdgeChunkWriter::WritePropertyChunk(
                                        property_group, adj_list_type_,
                                        vertex_chunk_index, chunk_index));
   std::string path = prefix_ + suffix;
-  return fs_->WriteTableToFile(in_table, file_type, path);
+  return fs_->WriteTableToFile(in_table, file_type, path, options_);
 }
 
 Status EdgeChunkWriter::WritePropertyChunk(
@@ -1001,29 +1029,46 @@ Result<std::shared_ptr<arrow::Table>> 
EdgeChunkWriter::sortTable(
 
 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
     const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
-    AdjListType adj_list_type, const ValidateLevel& validate_level) {
+    AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
+    const ValidateLevel& validate_level) {
   if (!edge_info->HasAdjacentListType(adj_list_type)) {
     return Status::KeyError(
         "The adjacent list type ", AdjListTypeToString(adj_list_type),
         " doesn't exist in edge ", edge_info->GetEdgeType(), ".");
   }
   return std::make_shared<EdgeChunkWriter>(edge_info, prefix, adj_list_type,
-                                           validate_level);
+                                           options, validate_level);
+}
+
+Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
+    const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
+    AdjListType adj_list_type, const ValidateLevel& validate_level) {
+  return Make(edge_info, prefix, adj_list_type,
+              WriterOptions::DefaultWriterOption(), validate_level);
 }
 
 Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
     const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
     const std::string& edge_type, const std::string& dst_type,
-    AdjListType adj_list_type, const ValidateLevel& validate_level) {
+    AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
+    const ValidateLevel& validate_level) {
   auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
   if (!edge_info) {
     return Status::KeyError("The edge ", src_type, " ", edge_type, " ",
                             dst_type, " doesn't exist.");
   }
-  return Make(edge_info, graph_info->GetPrefix(), adj_list_type,
+  return Make(edge_info, graph_info->GetPrefix(), adj_list_type, options,
               validate_level);
 }
 
+Result<std::shared_ptr<EdgeChunkWriter>> EdgeChunkWriter::Make(
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& src_type,
+    const std::string& edge_type, const std::string& dst_type,
+    AdjListType adj_list_type, const ValidateLevel& validate_level) {
+  return Make(graph_info, src_type, edge_type, dst_type, adj_list_type,
+              WriterOptions::DefaultWriterOption(), validate_level);
+}
+
 std::string EdgeChunkWriter::getSortColumnName(AdjListType adj_list_type) {
   switch (adj_list_type) {
   case AdjListType::unordered_by_source:
diff --git a/cpp/src/graphar/arrow/chunk_writer.h 
b/cpp/src/graphar/arrow/chunk_writer.h
index 89d33126..76066f28 100644
--- a/cpp/src/graphar/arrow/chunk_writer.h
+++ b/cpp/src/graphar/arrow/chunk_writer.h
@@ -68,6 +68,8 @@ class VertexPropertyWriter {
    */
   explicit VertexPropertyWriter(
       const std::shared_ptr<VertexInfo>& vertex_info, const std::string& 
prefix,
+      const std::shared_ptr<WriterOptions>& options =
+          WriterOptions::DefaultWriterOption(),
       const ValidateLevel& validate_level = ValidateLevel::no_validate);
 
   /**
@@ -214,7 +216,13 @@ class VertexPropertyWriter {
    * @param prefix The absolute prefix.
    * @param validate_level The global validate level for the writer, default is
    * no_validate.
+   * @param options Options for writing the table, such as compression.
    */
+  static Result<std::shared_ptr<VertexPropertyWriter>> Make(
+      const std::shared_ptr<VertexInfo>& vertex_info, const std::string& 
prefix,
+      const std::shared_ptr<WriterOptions>& options,
+      const ValidateLevel& validate_level = ValidateLevel::no_validate);
+
   static Result<std::shared_ptr<VertexPropertyWriter>> Make(
       const std::shared_ptr<VertexInfo>& vertex_info, const std::string& 
prefix,
       const ValidateLevel& validate_level = ValidateLevel::no_validate);
@@ -226,11 +234,21 @@ class VertexPropertyWriter {
    * @param type The vertex type.
    * @param validate_level The global validate level for the writer, default is
    * no_validate.
+   * @param options Options for writing the table, such as compression.
    */
+  static Result<std::shared_ptr<VertexPropertyWriter>> Make(
+      const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
+      const std::shared_ptr<WriterOptions>& options,
+      const ValidateLevel& validate_level = ValidateLevel::no_validate);
+
   static Result<std::shared_ptr<VertexPropertyWriter>> Make(
       const std::shared_ptr<GraphInfo>& graph_info, const std::string& type,
       const ValidateLevel& validate_level = ValidateLevel::no_validate);
 
+  void setWriterOptions(const std::shared_ptr<WriterOptions>& options) {
+    options_ = options;
+  }
+
   Result<std::shared_ptr<arrow::Table>> AddIndexColumn(
       const std::shared_ptr<arrow::Table>& table, IdType chunk_index,
       IdType chunk_size) const;
@@ -274,6 +292,7 @@ class VertexPropertyWriter {
   std::string prefix_;
   std::shared_ptr<FileSystem> fs_;
   ValidateLevel validate_level_;
+  std::shared_ptr<WriterOptions> options_;
 };
 
 /**
@@ -314,6 +333,8 @@ class EdgeChunkWriter {
   explicit EdgeChunkWriter(
       const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
       AdjListType adj_list_type,
+      const std::shared_ptr<WriterOptions>& options =
+          WriterOptions::DefaultWriterOption(),
       const ValidateLevel& validate_level = ValidateLevel::no_validate);
 
   /**
@@ -584,6 +605,11 @@ class EdgeChunkWriter {
    * @param validate_level The global validate level for the writer, default is
    * no_validate.
    */
+  static Result<std::shared_ptr<EdgeChunkWriter>> Make(
+      const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
+      AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
+      const ValidateLevel& validate_level = ValidateLevel::no_validate);
+
   static Result<std::shared_ptr<EdgeChunkWriter>> Make(
       const std::shared_ptr<EdgeInfo>& edge_info, const std::string& prefix,
       AdjListType adj_list_type,
@@ -600,6 +626,12 @@ class EdgeChunkWriter {
    * @param validate_level The global validate level for the writer, default is
    * no_validate.
    */
+  static Result<std::shared_ptr<EdgeChunkWriter>> Make(
+      const std::shared_ptr<GraphInfo>& graph_info, const std::string& 
src_type,
+      const std::string& edge_type, const std::string& dst_type,
+      AdjListType adj_list_type, const std::shared_ptr<WriterOptions>& options,
+      const ValidateLevel& validate_level = ValidateLevel::no_validate);
+
   static Result<std::shared_ptr<EdgeChunkWriter>> Make(
       const std::shared_ptr<GraphInfo>& graph_info, const std::string& 
src_type,
       const std::string& edge_type, const std::string& dst_type,
@@ -714,6 +746,7 @@ class EdgeChunkWriter {
   std::string prefix_;
   std::shared_ptr<FileSystem> fs_;
   ValidateLevel validate_level_;
+  std::shared_ptr<WriterOptions> options_;
 };
 
 }  // namespace graphar
diff --git a/cpp/src/graphar/filesystem.cc b/cpp/src/graphar/filesystem.cc
index e246eeb4..abc6d975 100644
--- a/cpp/src/graphar/filesystem.cc
+++ b/cpp/src/graphar/filesystem.cc
@@ -17,6 +17,8 @@
  * under the License.
  */
 
+#include <memory>
+#include "graphar/writer_util.h"
 #ifdef ARROW_ORC
 #include "arrow/adapters/orc/adapter.h"
 #endif
@@ -243,42 +245,37 @@ Status FileSystem::WriteValueToFile(const std::string& 
value,
   return Status::OK();
 }
 
-Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
-                                    FileType file_type,
-                                    const std::string& path) const noexcept {
+Status FileSystem::WriteTableToFile(
+    const std::shared_ptr<arrow::Table>& table, FileType file_type,
+    const std::string& path,
+    const std::shared_ptr<WriterOptions>& options) const noexcept {
   // try to create the directory, oss filesystem may not support this, ignore
   ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/"))));
   GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream,
                                        arrow_fs_->OpenOutputStream(path));
   switch (file_type) {
   case FileType::CSV: {
-    auto write_options = arrow::csv::WriteOptions::Defaults();
-    write_options.include_header = true;
-    write_options.quoting_style = arrow::csv::QuotingStyle::Needed;
     GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
-        auto writer, arrow::csv::MakeCSVWriter(output_stream.get(),
-                                               table->schema(), 
write_options));
+        auto writer,
+        arrow::csv::MakeCSVWriter(output_stream.get(), table->schema(),
+                                  options->getCsvOption()));
     RETURN_NOT_ARROW_OK(writer->WriteTable(*table));
     RETURN_NOT_ARROW_OK(writer->Close());
     break;
   }
   case FileType::PARQUET: {
     auto schema = table->schema();
-    auto column_num = schema->num_fields();
-    parquet::WriterProperties::Builder builder;
-    builder.compression(arrow::Compression::type::ZSTD);  // enable compression
     RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable(
         *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024,
-        builder.build(), parquet::default_arrow_writer_properties()));
+        options->getParquetWriterProperties(),
+        options->getArrowWriterProperties()));
     break;
   }
 #ifdef ARROW_ORC
   case FileType::ORC: {
-    auto writer_options = arrow::adapters::orc::WriteOptions();
-    writer_options.compression = arrow::Compression::type::ZSTD;
     GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(
         auto writer, arrow::adapters::orc::ORCFileWriter::Open(
-                         output_stream.get(), writer_options));
+                         output_stream.get(), options->getOrcOption()));
     RETURN_NOT_ARROW_OK(writer->Write(*table));
     RETURN_NOT_ARROW_OK(writer->Close());
     break;
diff --git a/cpp/src/graphar/filesystem.h b/cpp/src/graphar/filesystem.h
index 986b7d59..5c78c4bc 100644
--- a/cpp/src/graphar/filesystem.h
+++ b/cpp/src/graphar/filesystem.h
@@ -20,7 +20,6 @@
 #pragma once
 
 #include <memory>
-#include <optional>
 #include <string>
 #include <vector>
 
@@ -30,6 +29,7 @@
 #include "graphar/util.h"
 
 #include "graphar/reader_util.h"
+#include "graphar/writer_util.h"
 
 // forward declarations
 namespace arrow {
@@ -112,10 +112,12 @@ class FileSystem {
    * @param input_table The table to write.
    * @param file_type The type of the output file.
    * @param path The path of the output file.
+   * @param options Options for writing the table, such as compression.
    * @return A Status indicating OK if successful, or an error if unsuccessful.
    */
   Status WriteTableToFile(const std::shared_ptr<arrow::Table>& table,
-                          FileType file_type, const std::string& path) const
+                          FileType file_type, const std::string& path,
+                          const std::shared_ptr<WriterOptions>& options) const
       noexcept;
 
   /**
diff --git a/cpp/src/graphar/high-level/edges_builder.cc 
b/cpp/src/graphar/high-level/edges_builder.cc
index 74352274..38141fb2 100644
--- a/cpp/src/graphar/high-level/edges_builder.cc
+++ b/cpp/src/graphar/high-level/edges_builder.cc
@@ -28,7 +28,8 @@ namespace graphar::builder {
 
 Status EdgesBuilder::Dump() {
   // construct the writer
-  EdgeChunkWriter writer(edge_info_, prefix_, adj_list_type_, validate_level_);
+  EdgeChunkWriter writer(edge_info_, prefix_, adj_list_type_, nullptr,
+                         validate_level_);
   // construct empty edge collections for vertex chunks without edges
   IdType num_vertex_chunks =
       (num_vertices_ + vertex_chunk_size_ - 1) / vertex_chunk_size_;
diff --git a/cpp/src/graphar/high-level/vertices_builder.h 
b/cpp/src/graphar/high-level/vertices_builder.h
index 7ccf8e35..55b505d1 100644
--- a/cpp/src/graphar/high-level/vertices_builder.h
+++ b/cpp/src/graphar/high-level/vertices_builder.h
@@ -20,6 +20,7 @@
 #pragma once
 
 #include <any>
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -241,7 +242,9 @@ class VerticesBuilder {
    */
   Status Dump() {
     // construct the writer
-    VertexPropertyWriter writer(vertex_info_, prefix_, validate_level_);
+    // TODO(yangxk) Allow users to use custom options instead of nullptr
+    VertexPropertyWriter writer(vertex_info_, prefix_, nullptr,
+                                validate_level_);
     IdType start_chunk_index =
         start_vertex_index_ / vertex_info_->GetChunkSize();
     // convert to table
diff --git a/cpp/src/graphar/writer_util.cc b/cpp/src/graphar/writer_util.cc
new file mode 100644
index 00000000..380e3df0
--- /dev/null
+++ b/cpp/src/graphar/writer_util.cc
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "graphar/writer_util.h"
+namespace graphar {
+arrow::csv::WriteOptions WriterOptions::getCsvOption() const {
+  if (csvOption_) {
+    arrow::csv::WriteOptions csvWriteOptions;
+    csvWriteOptions.include_header = csvOption_->include_header;
+    csvWriteOptions.batch_size = csvOption_->batch_size;
+    csvWriteOptions.delimiter = csvOption_->delimiter;
+    csvWriteOptions.null_string = csvOption_->null_string;
+    csvWriteOptions.io_context = csvOption_->io_context;
+    csvWriteOptions.eol = csvOption_->eol;
+    csvWriteOptions.quoting_style = csvOption_->quoting_style;
+    return csvWriteOptions;
+  } else {
+    return arrow::csv::WriteOptions::Defaults();
+  }
+}
+
+std::shared_ptr<parquet::WriterProperties>
+WriterOptions::getParquetWriterProperties() const {
+  parquet::WriterProperties::Builder builder;
+  if (parquetOption_) {
+    builder
+        .dictionary_pagesize_limit(parquetOption_->dictionary_pagesize_limit)
+        ->write_batch_size(parquetOption_->write_batch_size)
+        ->max_row_group_length(parquetOption_->max_row_group_length)
+        ->data_pagesize(parquetOption_->data_pagesize)
+        ->data_page_version(parquetOption_->data_page_version)
+        ->version(parquetOption_->version)
+        ->encoding(parquetOption_->encoding)
+        ->max_statistics_size(parquetOption_->max_statistics_size)
+        ->compression(parquetOption_->compression)
+        ->compression_level(parquetOption_->compression_level);
+    for (const auto& kv : parquetOption_->column_encoding) {
+      builder.encoding(kv.first, kv.second);
+    }
+    for (const auto& kv : parquetOption_->column_compression) {
+      builder.compression(kv.first, kv.second);
+    }
+    for (const auto& kv : parquetOption_->column_compression_level) {
+      builder.compression_level(kv.first, kv.second);
+    }
+    if (!parquetOption_->enable_dictionary) {
+      builder.disable_dictionary();
+    }
+    if (parquetOption_->encryption_properties) {
+      builder.encryption(parquetOption_->encryption_properties);
+    }
+    if (!parquetOption_->enable_statistics) {
+      builder.disable_statistics();
+    }
+    for (const auto& path_st : parquetOption_->column_statistics) {
+      if (!path_st.second) {
+        builder.disable_statistics(path_st.first);
+      }
+    }
+    if (!parquetOption_->sorting_columns.empty()) {
+      builder.set_sorting_columns(parquetOption_->sorting_columns);
+    }
+    if (parquetOption_->enable_store_decimal_as_integer) {
+      builder.enable_store_decimal_as_integer();
+    }
+    if (parquetOption_->enable_write_page_index) {
+      builder.enable_write_page_index();
+    }
+  }
+  return builder.build();
+}
+
+std::shared_ptr<parquet::ArrowWriterProperties>
+WriterOptions::getArrowWriterProperties() const {
+  parquet::ArrowWriterProperties::Builder builder;
+  if (parquetOption_) {
+    if (!parquetOption_->compliant_nested_types) {
+      builder.disable_compliant_nested_types();
+    }
+    builder.set_use_threads(parquetOption_->use_threads);
+    if (parquetOption_->enable_deprecated_int96_timestamps) {
+      builder.enable_deprecated_int96_timestamps();
+    }
+    builder.coerce_timestamps(parquetOption_->coerce_timestamps);
+    if (parquetOption_->allow_truncated_timestamps) {
+      builder.allow_truncated_timestamps();
+    }
+    if (parquetOption_->store_schema) {
+      builder.store_schema();
+    }
+    if (parquetOption_->executor) {
+      builder.set_executor(parquetOption_->executor);
+    }
+  }
+  return builder.build();
+}
+
+#ifdef ARROW_ORC
+arrow::adapters::orc::WriteOptions WriterOptions::getOrcOption() const {
+  auto writer_options = arrow::adapters::orc::WriteOptions();
+  writer_options.compression = arrow::Compression::ZSTD;
+  if (orcOption_) {
+    writer_options.batch_size = orcOption_->batch_size;
+    writer_options.compression = orcOption_->compression;
+    writer_options.stripe_size = orcOption_->stripe_size;
+    writer_options.file_version = orcOption_->file_version;
+    writer_options.bloom_filter_columns = orcOption_->bloom_filter_columns;
+    writer_options.bloom_filter_fpp = orcOption_->bloom_filter_fpp;
+  }
+  return writer_options;
+}
+#endif
+}  // namespace graphar
diff --git a/cpp/src/graphar/writer_util.h b/cpp/src/graphar/writer_util.h
index 4f266ffa..e8e08e36 100644
--- a/cpp/src/graphar/writer_util.h
+++ b/cpp/src/graphar/writer_util.h
@@ -18,10 +18,398 @@
  */
 
 #pragma once
-
-#include "graphar/macros.h"
+#include <parquet/properties.h>
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+#ifdef ARROW_ORC
+#include "arrow/adapters/orc/adapter.h"
+#endif
+#include "arrow/api.h"
+#include "arrow/csv/api.h"
+#include "arrow/dataset/api.h"
+#include "arrow/filesystem/api.h"
+#include "parquet/arrow/writer.h"
 
 namespace graphar {
+/**
+ * @class WriterOptions
+ * @brief Provides configuration options for different file format writers 
(CSV,
+ * Parquet, ORC) in GraphAr. A WriterOptions instance can simultaneously 
contain
+ * options for all three formats (CSV, Parquet, ORC). The actual file format
+ * used for writing is determined by the FileType specified in the graph_Info.
+ *
+ * The configuration parameters and their default values are aligned with those
+ * in Arrow.
+ */
+class WriterOptions {
+ private:
+  /**
+   * @class CSVOption
+   * @brief Configuration options for CSV Writer.
+   * The complete list of supported parameters can be found in:
+   * https://arrow.apache.org/docs/cpp/api/formats.html#csv-writer
+   */
+  class CSVOption {
+   public:
+    arrow::io::IOContext io_context;
+    std::string null_string;
+    std::string eol = "\n";
+    bool include_header = true;
+    char delimiter = ',';
+    int32_t batch_size = 1024;
+    arrow::csv::QuotingStyle quoting_style = arrow::csv::QuotingStyle::Needed;
+  };
+  /**
+   * @class ParquetOption
+   * @brief Configuration options for Parquet Writer.
+   * This class includes parameters from both `WriterProperties` and
+   * `ArrowWriterProperties`. The complete list of supported parameters can be
+   * found in: 
https://arrow.apache.org/docs/cpp/api/formats.html#parquet-writer
+   */
+  class ParquetOption {
+   public:
+    std::shared_ptr<::parquet::FileEncryptionProperties> encryption_properties;
+    std::unordered_map<std::string, ::parquet::Encoding::type> column_encoding;
+    std::unordered_map<std::string, arrow::Compression::type>
+        column_compression;
+    std::unordered_map<std::string, int> column_compression_level;
+    std::unordered_map<std::string, size_t> column_max_statistics_size;
+    std::unordered_map<std::string, bool> column_statistics;
+    std::unordered_map<std::string, bool> column_write_page_index;
+    std::vector<::parquet::SortingColumn> sorting_columns;
+    int64_t dictionary_pagesize_limit = 1024 * 1024;
+    int64_t write_batch_size = 1024;
+    int64_t max_row_group_length = 1024 * 1024;
+    int64_t data_pagesize = 1024 * 1024;
+    size_t max_statistics_size = 4096;
+    int compression_level = std::numeric_limits<int>::min();
+    ::parquet::ParquetDataPageVersion data_page_version =
+        ::parquet::ParquetDataPageVersion::V1;
+    ::parquet::ParquetVersion::type version =
+        ::parquet::ParquetVersion::PARQUET_2_6;
+    ::parquet::Encoding::type encoding = ::parquet::Encoding::PLAIN;
+    arrow::Compression::type compression = arrow::Compression::ZSTD;
+    ::arrow::TimeUnit::type coerce_timestamps = ::arrow::TimeUnit::MICRO;
+    ::arrow::internal::Executor* executor = nullptr;
+    bool enable_dictionary = true;
+    bool enable_statistics = true;
+    bool enable_store_decimal_as_integer = false;
+    bool enable_write_page_index = false;
+    bool compliant_nested_types = true;
+    bool use_threads = false;
+    bool enable_deprecated_int96_timestamps = false;
+    bool allow_truncated_timestamps = false;
+    bool store_schema = false;
+  };
+  /**
+   * @class ORCOption
+   * @brief Configuration options for ORC Writer.
+   * The complete list of supported parameters can be found in:
+   * 
https://arrow.apache.org/docs/cpp/api/formats.html#_CPPv4N5arrow8adapters3orc12WriteOptionsE
+   */
+  class ORCOption {
+#ifdef ARROW_ORC
+   public:
+    std::vector<int64_t> bloom_filter_columns;
+    arrow::adapters::orc::FileVersion file_version =
+        arrow::adapters::orc::FileVersion(0, 12);
+    arrow::adapters::orc::CompressionStrategy compression_strategy =
+        arrow::adapters::orc::CompressionStrategy::kSpeed;
+    arrow::Compression::type compression = arrow::Compression::UNCOMPRESSED;
+    int64_t stripe_size = 64 * 1024 * 1024;
+    int64_t batch_size = 1024;
+    int64_t compression_block_size = 64 * 1024;
+    int64_t row_index_stride = 10000;
+    double padding_tolerance = 0.0;
+    double dictionary_key_size_threshold = 0.0;
+    double bloom_filter_fpp = 0.05;
+#endif
+  };
+
+ public:
+  // Builder for CSVOption
+  class CSVOptionBuilder {
+   public:
+    CSVOptionBuilder() : option_(std::make_shared<CSVOption>()) {}
+    explicit CSVOptionBuilder(std::shared_ptr<WriterOptions> wopt)
+        : writerOptions_(wopt),
+          option_(wopt ? wopt->csvOption_ : std::make_shared<CSVOption>()) {}
+    CSVOptionBuilder& include_header(bool header) {
+      option_->include_header = header;
+      return *this;
+    }
+    CSVOptionBuilder& batch_size(int32_t bs) {
+      option_->batch_size = bs;
+      return *this;
+    }
+    CSVOptionBuilder& delimiter(char d) {
+      option_->delimiter = d;
+      return *this;
+    }
+    CSVOptionBuilder& null_string(const std::string& ns) {
+      option_->null_string = ns;
+      return *this;
+    }
+    CSVOptionBuilder& io_context(const arrow::io::IOContext& ctx) {
+      option_->io_context = ctx;
+      return *this;
+    }
+    CSVOptionBuilder& eol(const std::string& e) {
+      option_->eol = e;
+      return *this;
+    }
+    CSVOptionBuilder& quoting_style(arrow::csv::QuotingStyle qs) {
+      option_->quoting_style = qs;
+      return *this;
+    }
+    std::shared_ptr<WriterOptions> build() {
+      return writerOptions_
+                 ? writerOptions_
+                 : std::make_shared<WriterOptions>(option_, nullptr, nullptr);
+    }
+
+   private:
+    std::shared_ptr<WriterOptions> writerOptions_;
+    std::shared_ptr<CSVOption> option_;
+  };
+
+  // Builder for ParquetOption
+  class ParquetOptionBuilder {
+   public:
+    ParquetOptionBuilder() : option_(std::make_shared<ParquetOption>()) {}
+    explicit ParquetOptionBuilder(std::shared_ptr<WriterOptions> wopt)
+        : writerOptions_(wopt),
+          option_(wopt ? wopt->parquetOption_
+                       : std::make_shared<ParquetOption>()) {}
+    ParquetOptionBuilder& enable_dictionary(bool enable) {
+      option_->enable_dictionary = enable;
+      return *this;
+    }
+    ParquetOptionBuilder& dictionary_pagesize_limit(int64_t limit) {
+      option_->dictionary_pagesize_limit = limit;
+      return *this;
+    }
+    ParquetOptionBuilder& write_batch_size(int64_t batch_size) {
+      option_->write_batch_size = batch_size;
+      return *this;
+    }
+    ParquetOptionBuilder& max_row_group_length(int64_t length) {
+      option_->max_row_group_length = length;
+      return *this;
+    }
+    ParquetOptionBuilder& data_pagesize(int64_t pagesize) {
+      option_->data_pagesize = pagesize;
+      return *this;
+    }
+    ParquetOptionBuilder& data_page_version(
+        ::parquet::ParquetDataPageVersion version) {
+      option_->data_page_version = version;
+      return *this;
+    }
+    ParquetOptionBuilder& version(::parquet::ParquetVersion::type ver) {
+      option_->version = ver;
+      return *this;
+    }
+    ParquetOptionBuilder& encoding(::parquet::Encoding::type enc) {
+      option_->encoding = enc;
+      return *this;
+    }
+    ParquetOptionBuilder& column_encoding(
+        const std::unordered_map<std::string, ::parquet::Encoding::type>&
+            encodings) {
+      option_->column_encoding = encodings;
+      return *this;
+    }
+    ParquetOptionBuilder& compression(arrow::Compression::type comp) {
+      option_->compression = comp;
+      return *this;
+    }
+    ParquetOptionBuilder& column_compression(
+        const std::unordered_map<std::string, arrow::Compression::type>&
+            compressions) {
+      option_->column_compression = compressions;
+      return *this;
+    }
+    ParquetOptionBuilder& compression_level(int level) {
+      option_->compression_level = level;
+      return *this;
+    }
+    ParquetOptionBuilder& column_compression_level(
+        const std::unordered_map<std::string, int>& levels) {
+      option_->column_compression_level = levels;
+      return *this;
+    }
+    ParquetOptionBuilder& max_statistics_size(size_t size) {
+      option_->max_statistics_size = size;
+      return *this;
+    }
+    ParquetOptionBuilder& column_max_statistics_size(
+        const std::unordered_map<std::string, size_t>& sizes) {
+      option_->column_max_statistics_size = sizes;
+      return *this;
+    }
+    ParquetOptionBuilder& encryption_properties(
+        const std::shared_ptr<::parquet::FileEncryptionProperties>& props) {
+      option_->encryption_properties = props;
+      return *this;
+    }
+    ParquetOptionBuilder& enable_statistics(bool enable) {
+      option_->enable_statistics = enable;
+      return *this;
+    }
+    ParquetOptionBuilder& column_statistics(
+        const std::unordered_map<std::string, bool>& stats) {
+      option_->column_statistics = stats;
+      return *this;
+    }
+    ParquetOptionBuilder& sorting_columns(
+        const std::vector<::parquet::SortingColumn>& columns) {
+      option_->sorting_columns = columns;
+      return *this;
+    }
+    ParquetOptionBuilder& enable_store_decimal_as_integer(bool enable) {
+      option_->enable_store_decimal_as_integer = enable;
+      return *this;
+    }
+    ParquetOptionBuilder& enable_write_page_index(bool enable) {
+      option_->enable_write_page_index = enable;
+      return *this;
+    }
+    ParquetOptionBuilder& column_write_page_index(
+        const std::unordered_map<std::string, bool>& indices) {
+      option_->column_write_page_index = indices;
+      return *this;
+    }
+    ParquetOptionBuilder& compliant_nested_types(bool compliant) {
+      option_->compliant_nested_types = compliant;
+      return *this;
+    }
+    ParquetOptionBuilder& use_threads(bool use) {
+      option_->use_threads = use;
+      return *this;
+    }
+    ParquetOptionBuilder& enable_deprecated_int96_timestamps(bool enable) {
+      option_->enable_deprecated_int96_timestamps = enable;
+      return *this;
+    }
+    ParquetOptionBuilder& coerce_timestamps(::arrow::TimeUnit::type unit) {
+      option_->coerce_timestamps = unit;
+      return *this;
+    }
+    ParquetOptionBuilder& allow_truncated_timestamps(bool allow) {
+      option_->allow_truncated_timestamps = allow;
+      return *this;
+    }
+    ParquetOptionBuilder& store_schema(bool store) {
+      option_->store_schema = store;
+      return *this;
+    }
+    ParquetOptionBuilder& executor(::arrow::internal::Executor* exec) {
+      option_->executor = exec;
+      return *this;
+    }
+    std::shared_ptr<WriterOptions> build() {
+      return writerOptions_
+                 ? writerOptions_
+                 : std::make_shared<WriterOptions>(nullptr, option_, nullptr);
+    }
+
+   private:
+    std::shared_ptr<WriterOptions> writerOptions_;
+    std::shared_ptr<ParquetOption> option_;
+  };
+
+  // Builder for ORCOption
+  class ORCOptionBuilder {
+   public:
+    ORCOptionBuilder() : option_(std::make_shared<ORCOption>()) {}
+    explicit ORCOptionBuilder(std::shared_ptr<WriterOptions> wopt)
+        : writerOptions_(wopt),
+          option_(wopt ? wopt->orcOption_ : std::make_shared<ORCOption>()) {}
+#ifdef ARROW_ORC
+    ORCOptionBuilder& batch_size(int64_t bs) {
+      option_->batch_size = bs;
+      return *this;
+    }
+    ORCOptionBuilder& file_version(arrow::adapters::orc::FileVersion fv) {
+      option_->file_version = fv;
+      return *this;
+    }
+    ORCOptionBuilder& stripe_size(int64_t ss) {
+      option_->stripe_size = ss;
+      return *this;
+    }
+    ORCOptionBuilder& compression(arrow::Compression::type comp) {
+      option_->compression = comp;
+      return *this;
+    }
+    ORCOptionBuilder& compression_block_size(int64_t cbs) {
+      option_->compression_block_size = cbs;
+      return *this;
+    }
+    ORCOptionBuilder& compression_strategy(
+        arrow::adapters::orc::CompressionStrategy cs) {
+      option_->compression_strategy = cs;
+      return *this;
+    }
+    ORCOptionBuilder& row_index_stride(int64_t ris) {
+      option_->row_index_stride = ris;
+      return *this;
+    }
+    ORCOptionBuilder& padding_tolerance(double pt) {
+      option_->padding_tolerance = pt;
+      return *this;
+    }
+    ORCOptionBuilder& dictionary_key_size_threshold(double dkst) {
+      option_->dictionary_key_size_threshold = dkst;
+      return *this;
+    }
+    ORCOptionBuilder& bloom_filter_columns(const int64_t bfc) {
+      option_->bloom_filter_columns.push_back(bfc);
+      return *this;
+    }
+    ORCOptionBuilder& bloom_filter_fpp(double bffpp) {
+      option_->bloom_filter_fpp = bffpp;
+      return *this;
+    }
+#endif
+    std::shared_ptr<WriterOptions> build() {
+      return writerOptions_
+                 ? writerOptions_
+                 : std::make_shared<WriterOptions>(nullptr, nullptr, option_);
+    }
+
+   private:
+    std::shared_ptr<WriterOptions> writerOptions_;
+    std::shared_ptr<ORCOption> option_;
+  };
+
+  WriterOptions() = default;
+  WriterOptions(std::shared_ptr<CSVOption> csv,
+                std::shared_ptr<ParquetOption> parquet,
+                std::shared_ptr<ORCOption> orc)
+      : csvOption_(csv), parquetOption_(parquet), orcOption_(orc) {}
+  static std::shared_ptr<WriterOptions> DefaultWriterOption() {
+    return std::make_shared<WriterOptions>();
+  }
+  arrow::csv::WriteOptions getCsvOption() const;
+  std::shared_ptr<parquet::WriterProperties> getParquetWriterProperties() 
const;
+  std::shared_ptr<parquet::ArrowWriterProperties> getArrowWriterProperties()
+      const;
+#ifdef ARROW_ORC
+  arrow::adapters::orc::WriteOptions getOrcOption() const;
+#endif
+
+ private:
+  std::shared_ptr<CSVOption> csvOption_;
+  std::shared_ptr<ParquetOption> parquetOption_;
+  std::shared_ptr<ORCOption> orcOption_;
+};
 
 /**
  * @brief The level for validating writing operations.
diff --git a/cpp/test/test_arrow_chunk_writer.cc 
b/cpp/test/test_arrow_chunk_writer.cc
index cc35e25e..48814835 100644
--- a/cpp/test/test_arrow_chunk_writer.cc
+++ b/cpp/test/test_arrow_chunk_writer.cc
@@ -17,15 +17,19 @@
  * under the License.
  */
 
+#include <parquet/types.h>
 #include <fstream>
 #include <iostream>
+#include <ostream>
 #include <sstream>
 #include <string>
 
+#include "arrow/api.h"
+#include "graphar/label.h"
+#include "graphar/writer_util.h"
 #ifdef ARROW_ORC
 #include "arrow/adapters/orc/adapter.h"
 #endif
-#include "arrow/api.h"
 #include "arrow/csv/api.h"
 #include "arrow/filesystem/api.h"
 #include "arrow/io/api.h"
@@ -67,11 +71,11 @@ TEST_CASE_METHOD(GlobalFixture, "TestVertexPropertyWriter") 
{
   std::cout << table->num_rows() << ' ' << table->num_columns() << std::endl;
 
   // Construct the writer
-  std::string vertex_meta_file =
+  std::string vertex_meta_file_parquet =
       test_data_dir + "/ldbc_sample/parquet/" + "person.vertex.yml";
-  auto vertex_meta = Yaml::LoadFile(vertex_meta_file).value();
-  auto vertex_info = VertexInfo::Load(vertex_meta).value();
-  auto maybe_writer = VertexPropertyWriter::Make(vertex_info, "/tmp/");
+  auto vertex_meta_parquet = Yaml::LoadFile(vertex_meta_file_parquet).value();
+  auto vertex_info_parquet = VertexInfo::Load(vertex_meta_parquet).value();
+  auto maybe_writer = VertexPropertyWriter::Make(vertex_info_parquet, "/tmp/");
   REQUIRE(!maybe_writer.has_error());
   auto writer = maybe_writer.value();
 
@@ -98,7 +102,7 @@ TEST_CASE_METHOD(GlobalFixture, "TestVertexPropertyWriter") {
   // Out of range
   REQUIRE(writer->WriteChunk(table, 0).IsInvalid());
   // Invalid chunk id
-  auto chunk = table->Slice(0, vertex_info->GetChunkSize());
+  auto chunk = table->Slice(0, vertex_info_parquet->GetChunkSize());
   REQUIRE(writer->WriteChunk(chunk, -1).IsIndexError());
   // Invalid property group
   Property p1("invalid_property", int32(), false);
@@ -108,10 +112,10 @@ TEST_CASE_METHOD(GlobalFixture, 
"TestVertexPropertyWriter") {
   std::shared_ptr<arrow::Table> tmp_table =
       table->RenameColumns({"original_id", "firstName", "lastName", "id"})
           .ValueOrDie();
-  auto pg2 = vertex_info->GetPropertyGroup("firstName");
+  auto pg2 = vertex_info_parquet->GetPropertyGroup("firstName");
   REQUIRE(writer->WriteTable(tmp_table, pg2, 0).IsInvalid());
   // Invalid data type
-  auto pg3 = vertex_info->GetPropertyGroup("id");
+  auto pg3 = vertex_info_parquet->GetPropertyGroup("id");
   REQUIRE(writer->WriteTable(tmp_table, pg3, 0).IsTypeError());
 
 #ifdef ARROW_ORC
@@ -154,39 +158,155 @@ TEST_CASE_METHOD(GlobalFixture, 
"TestVertexPropertyWriter") {
             table2->GetColumnByName("gender")->ToString());
   }
 #endif
-
-  SECTION("TestEdgeChunkWriter") {
-    arrow::Status st;
+  SECTION("TestVertexPropertyWriterWithOption") {
+    // csv file
+    // Construct the writer
+    std::string vertex_meta_file_csv =
+        test_data_dir + "/ldbc_sample/csv/" + "person.vertex.yml";
+    auto vertex_meta_csv = Yaml::LoadFile(vertex_meta_file_csv).value();
+    auto vertex_info_csv = VertexInfo::Load(vertex_meta_csv).value();
+    auto csv_options = WriterOptions::CSVOptionBuilder();
+    csv_options.include_header(true);
+    csv_options.delimiter('|');
+    auto maybe_writer = VertexPropertyWriter::Make(
+        vertex_info_csv, "/tmp/option/", csv_options.build());
+    REQUIRE(!maybe_writer.has_error());
+    auto writer = maybe_writer.value();
+    REQUIRE(writer->WriteTable(table, 0).ok());
+    // read csv file
+    auto parse_options = arrow::csv::ParseOptions::Defaults();
+    parse_options.delimiter = '|';
+    std::shared_ptr<arrow::io::InputStream> chunk0_input =
+        fs->OpenInputStream(
+              "/tmp/option/vertex/person/firstName_lastName_gender/chunk0")
+            .ValueOrDie();
+    auto read_options = arrow::csv::ReadOptions::Defaults();
+    auto csv_reader =
+        arrow::csv::TableReader::Make(arrow::io::default_io_context(),
+                                      chunk0_input, read_options, 
parse_options,
+                                      arrow::csv::ConvertOptions::Defaults())
+            .ValueOrDie();
+    auto maybe_table = csv_reader->Read();
+    REQUIRE(maybe_table.ok());
+    std::shared_ptr<arrow::Table> csv_table = *maybe_table;
+    REQUIRE(csv_table->num_rows() == vertex_info_csv->GetChunkSize());
+    REQUIRE(csv_table->num_columns() ==
+            static_cast<int>(vertex_info_csv->GetPropertyGroup("firstName")
+                                 ->GetProperties()
+                                 .size()) +
+                1);
+    // type parquet
+    auto options_parquet_Builder = WriterOptions::ParquetOptionBuilder();
+    
options_parquet_Builder.compression(arrow::Compression::type::UNCOMPRESSED);
+    options_parquet_Builder.enable_statistics(false);
+    parquet::SortingColumn sc;
+    sc.column_idx = 1;
+    std::vector<::parquet::SortingColumn> columns = {sc};
+    options_parquet_Builder.sorting_columns(columns)
+        .enable_store_decimal_as_integer(true)
+        .max_row_group_length(10);
+    maybe_writer = VertexPropertyWriter::Make(
+        vertex_info_parquet, "/tmp/option/", options_parquet_Builder.build());
+    REQUIRE(!maybe_writer.has_error());
+    writer = maybe_writer.value();
+    REQUIRE(writer->WriteTable(table, 0).ok());
+    // read parquet file
+    std::string parquet_file =
+        "/tmp/option/vertex/person/firstName_lastName_gender/chunk0";
+    auto parquet_fs =
+        arrow::fs::FileSystemFromUriOrPath(parquet_file).ValueOrDie();
+    std::shared_ptr<arrow::io::RandomAccessFile> parquet_input =
+        parquet_fs->OpenInputFile(parquet_file).ValueOrDie();
+    std::unique_ptr<parquet::arrow::FileReader> parquet_reader;
+    auto st = parquet::arrow::OpenFile(
+        parquet_input, arrow::default_memory_pool(), &parquet_reader);
+    REQUIRE(st.ok());
+    std::shared_ptr<arrow::Table> parquet_table;
+    st = parquet_reader->ReadTable(&parquet_table);
+    REQUIRE(st.ok());
+    auto parquet_metadata = parquet_reader->parquet_reader()->metadata();
+    auto row_group_meta = parquet_metadata->RowGroup(0);
+    auto col_meta = row_group_meta->ColumnChunk(0);
+    REQUIRE(row_group_meta->sorting_columns().size() == 1);
+    REQUIRE(row_group_meta->sorting_columns()[0].column_idx == 1);
+    REQUIRE(col_meta->compression() == parquet::Compression::UNCOMPRESSED);
+    REQUIRE(!col_meta->statistics());
+    REQUIRE(parquet_table->num_rows() == vertex_info_parquet->GetChunkSize());
+    REQUIRE(parquet_metadata->num_row_groups() ==
+            parquet_table->num_rows() / 10);
+    REQUIRE(parquet_table->num_columns() ==
+            static_cast<int>(vertex_info_parquet->GetPropertyGroup("firstName")
+                                 ->GetProperties()
+                                 .size() +
+                             1));
+#ifdef ARROW_ORC
+    std::string vertex_meta_file_orc =
+        test_data_dir + "/ldbc_sample/orc/" + "person.vertex.yml";
+    auto vertex_meta_orc = Yaml::LoadFile(vertex_meta_file_orc).value();
+    auto vertex_info_orc = VertexInfo::Load(vertex_meta_orc).value();
+    auto optionsOrcBuilder = WriterOptions::ORCOptionBuilder();
+    optionsOrcBuilder.compression(arrow::Compression::type::ZSTD);
+    maybe_writer = VertexPropertyWriter::Make(vertex_info_orc, "/tmp/option/",
+                                              optionsOrcBuilder.build());
+    REQUIRE(!maybe_writer.has_error());
+    writer = maybe_writer.value();
+    REQUIRE(writer->WriteTable(table, 0).ok());
+    auto fs1 = arrow::fs::FileSystemFromUriOrPath(
+                   
"/tmp/option/vertex/person/firstName_lastName_gender/chunk0")
+                   .ValueOrDie();
+    std::shared_ptr<arrow::io::RandomAccessFile> input1 =
+        fs1->OpenInputFile(
+               "/tmp/option/vertex/person/firstName_lastName_gender/chunk0")
+            .ValueOrDie();
     arrow::MemoryPool* pool = arrow::default_memory_pool();
-    std::string path = test_data_dir +
-                       "/ldbc_sample/parquet/edge/person_knows_person/"
-                       "unordered_by_source/adj_list/part0/chunk0";
-    auto fs = arrow::fs::FileSystemFromUriOrPath(path).ValueOrDie();
-    std::shared_ptr<arrow::io::RandomAccessFile> input =
-        fs->OpenInputFile(path).ValueOrDie();
-    std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
-    st = parquet::arrow::OpenFile(input, pool, &arrow_reader);
+    std::unique_ptr<arrow::adapters::orc::ORCFileReader> reader =
+        arrow::adapters::orc::ORCFileReader::Open(input1, pool).ValueOrDie();
     // Read entire file as a single Arrow table
-    std::shared_ptr<arrow::Table> maybe_table;
-    st = arrow_reader->ReadTable(&maybe_table);
-    REQUIRE(st.ok());
+    maybe_table = reader->Read();
+    std::shared_ptr<arrow::Table> table1 = maybe_table.ValueOrDie();
+    REQUIRE(reader->GetCompression() == parquet::Compression::ZSTD);
+    REQUIRE(table1->num_rows() == vertex_info_parquet->GetChunkSize());
+    REQUIRE(table1->num_columns() ==
+            static_cast<int>(vertex_info_parquet->GetPropertyGroup("firstName")
+                                 ->GetProperties()
+                                 .size()) +
+                1);
+#endif
+  }
+}
+TEST_CASE_METHOD(GlobalFixture, "TestEdgeChunkWriter") {
+  arrow::Status st;
+  arrow::MemoryPool* pool = arrow::default_memory_pool();
+  std::string path = test_data_dir +
+                     "/ldbc_sample/parquet/edge/person_knows_person/"
+                     "unordered_by_source/adj_list/part0/chunk0";
+  auto fs = arrow::fs::FileSystemFromUriOrPath(path).ValueOrDie();
+  std::shared_ptr<arrow::io::RandomAccessFile> input =
+      fs->OpenInputFile(path).ValueOrDie();
+  std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
+  st = parquet::arrow::OpenFile(input, pool, &arrow_reader);
+  // Read entire file as a single Arrow table
+  std::shared_ptr<arrow::Table> maybe_table;
+  st = arrow_reader->ReadTable(&maybe_table);
+  REQUIRE(st.ok());
 
-    std::shared_ptr<arrow::Table> table =
-        maybe_table
-            ->RenameColumns(
-                {GeneralParams::kSrcIndexCol, GeneralParams::kDstIndexCol})
-            .ValueOrDie();
-    std::cout << table->schema()->ToString() << std::endl;
-    std::cout << table->num_rows() << ' ' << table->num_columns() << std::endl;
+  std::shared_ptr<arrow::Table> table =
+      maybe_table
+          ->RenameColumns(
+              {GeneralParams::kSrcIndexCol, GeneralParams::kDstIndexCol})
+          .ValueOrDie();
+  std::cout << table->schema()->ToString() << std::endl;
+  std::cout << table->num_rows() << ' ' << table->num_columns() << std::endl;
+  // Construct the writer
+  std::string edge_meta_file_csv =
+      test_data_dir + "/ldbc_sample/csv/" + "person_knows_person.edge.yml";
+  auto edge_meta_csv = Yaml::LoadFile(edge_meta_file_csv).value();
+  auto edge_info_csv = EdgeInfo::Load(edge_meta_csv).value();
+  auto adj_list_type = AdjListType::ordered_by_source;
 
-    // Construct the writer
-    std::string edge_meta_file =
-        test_data_dir + "/ldbc_sample/csv/" + "person_knows_person.edge.yml";
-    auto edge_meta = Yaml::LoadFile(edge_meta_file).value();
-    auto edge_info = EdgeInfo::Load(edge_meta).value();
-    auto adj_list_type = AdjListType::ordered_by_source;
+  SECTION("TestEdgeChunkWriterWithoutOption") {
     auto maybe_writer =
-        EdgeChunkWriter::Make(edge_info, "/tmp/", adj_list_type);
+        EdgeChunkWriter::Make(edge_info_csv, "/tmp/", adj_list_type);
     REQUIRE(!maybe_writer.has_error());
     auto writer = maybe_writer.value();
 
@@ -236,14 +356,14 @@ TEST_CASE_METHOD(GlobalFixture, 
"TestVertexPropertyWriter") {
     // Invalid adj list type
     auto invalid_adj_list_type = AdjListType::unordered_by_dest;
     auto maybe_writer2 =
-        EdgeChunkWriter::Make(edge_info, "/tmp/", invalid_adj_list_type);
+        EdgeChunkWriter::Make(edge_info_csv, "/tmp/", invalid_adj_list_type);
     REQUIRE(maybe_writer2.has_error());
     // Invalid property group
     Property p1("invalid_property", int32(), false);
     auto pg1 = CreatePropertyGroup({p1}, FileType::CSV);
     REQUIRE(writer->WritePropertyChunk(table, pg1, 0, 0).IsKeyError());
     // Property not found in table
-    auto pg2 = edge_info->GetPropertyGroup("creationDate");
+    auto pg2 = edge_info_csv->GetPropertyGroup("creationDate");
     REQUIRE(writer->WritePropertyChunk(table, pg2, 0, 0).IsInvalid());
     // Required columns not found
     std::shared_ptr<arrow::Table> tmp_table =
@@ -252,5 +372,120 @@ TEST_CASE_METHOD(GlobalFixture, 
"TestVertexPropertyWriter") {
     // Invalid data type
     REQUIRE(writer->WritePropertyChunk(tmp_table, pg2, 0, 0).IsTypeError());
   }
+  SECTION("TestEdgeChunkWriterWithOption") {
+    WriterOptions::CSVOptionBuilder csv_options_builder;
+    csv_options_builder.include_header(true).delimiter('|');
+    auto maybe_writer =
+        EdgeChunkWriter::Make(edge_info_csv, "/tmp/option/", adj_list_type,
+                              csv_options_builder.build());
+    REQUIRE(!maybe_writer.has_error());
+    auto writer = maybe_writer.value();
+
+    // Valid: Write adj list with options
+    REQUIRE(writer->SortAndWriteAdjListTable(table, 0, 0).ok());
+    // Valid: Write edge count
+    REQUIRE(writer->WriteEdgesNum(0, table->num_rows()).ok());
+    // Valid: Write vertex count
+    REQUIRE(writer->WriteVerticesNum(903).ok());
+
+    // Read back CSV file and check delimiter/header
+    auto parse_options = arrow::csv::ParseOptions::Defaults();
+    parse_options.delimiter = '|';
+    auto read_options = arrow::csv::ReadOptions::Defaults();
+    std::shared_ptr<arrow::io::InputStream> chunk0_input =
+        fs->OpenInputStream(
+              
"/tmp/option/edge/person_knows_person/ordered_by_source/adj_list/"
+              "part0/chunk0")
+            .ValueOrDie();
+    auto csv_reader =
+        arrow::csv::TableReader::Make(arrow::io::default_io_context(),
+                                      chunk0_input, read_options, 
parse_options,
+                                      arrow::csv::ConvertOptions::Defaults())
+            .ValueOrDie();
+    auto maybe_table2 = csv_reader->Read();
+    REQUIRE(maybe_table2.ok());
+    std::shared_ptr<arrow::Table> csv_table = *maybe_table2;
+    REQUIRE(csv_table->num_rows() ==
+            std::min(edge_info_csv->GetChunkSize(), table->num_rows()));
+    REQUIRE(csv_table->num_columns() == table->num_columns());
+
+    // Parquet option
+    std::string edge_meta_file_parquet = test_data_dir +
+                                         "/ldbc_sample/parquet/" +
+                                         "person_knows_person.edge.yml";
+    auto edge_meta_parquet = Yaml::LoadFile(edge_meta_file_parquet).value();
+    auto edge_info_parquet = EdgeInfo::Load(edge_meta_parquet).value();
+    auto optionsBuilderParquet = WriterOptions::ParquetOptionBuilder();
+    optionsBuilderParquet.compression(arrow::Compression::type::UNCOMPRESSED);
+    optionsBuilderParquet.enable_statistics(false);
+    optionsBuilderParquet.enable_store_decimal_as_integer(true);
+    optionsBuilderParquet.max_row_group_length(10);
+    auto maybe_parquet_writer =
+        EdgeChunkWriter::Make(edge_info_parquet, "/tmp/option/", adj_list_type,
+                              optionsBuilderParquet.build());
+    REQUIRE(!maybe_parquet_writer.has_error());
+    auto parquet_writer = maybe_parquet_writer.value();
+    REQUIRE(parquet_writer->SortAndWriteAdjListTable(table, 0, 0).ok());
+    std::string parquet_file =
+        
"/tmp/option/edge/person_knows_person/ordered_by_source/adj_list/part0/"
+        "chunk0";
+    auto parquet_fs =
+        arrow::fs::FileSystemFromUriOrPath(parquet_file).ValueOrDie();
+    std::shared_ptr<arrow::io::RandomAccessFile> parquet_input =
+        parquet_fs->OpenInputFile(parquet_file).ValueOrDie();
+    std::unique_ptr<parquet::arrow::FileReader> parquet_reader;
+    auto st = parquet::arrow::OpenFile(
+        parquet_input, arrow::default_memory_pool(), &parquet_reader);
+    REQUIRE(st.ok());
+    std::shared_ptr<arrow::Table> parquet_table;
+    st = parquet_reader->ReadTable(&parquet_table);
+    REQUIRE(st.ok());
+    auto parquet_metadata = parquet_reader->parquet_reader()->metadata();
+    auto row_group_meta = parquet_metadata->RowGroup(0);
+    auto col_meta = row_group_meta->ColumnChunk(0);
+    REQUIRE(col_meta->compression() == parquet::Compression::UNCOMPRESSED);
+    REQUIRE(!col_meta->statistics());
+    REQUIRE(parquet_table->num_rows() ==
+            std::min(table->num_rows(), edge_info_parquet->GetChunkSize()));
+    REQUIRE(parquet_metadata->num_row_groups() ==
+            parquet_table->num_rows() / 10 + 1);
+    REQUIRE(parquet_table->num_columns() ==
+            static_cast<int>(table->num_columns()));
+
+#ifdef ARROW_ORC
+    // ORC option
+    std::string edge_meta_file_orc =
+        test_data_dir + "/ldbc_sample/orc/" + "person_knows_person.edge.yml";
+    auto edge_meta_orc = Yaml::LoadFile(edge_meta_file_orc).value();
+    auto edge_info_orc = EdgeInfo::Load(edge_meta_orc).value();
+    auto optionsBuilderOrc = WriterOptions::ORCOptionBuilder();
+    optionsBuilderOrc.compression(arrow::Compression::type::ZSTD);
+    auto maybe_orc_writer =
+        EdgeChunkWriter::Make(edge_info_orc, "/tmp/option/", adj_list_type,
+                              optionsBuilderOrc.build());
+    REQUIRE(!maybe_orc_writer.has_error());
+    auto orc_writer = maybe_orc_writer.value();
+    REQUIRE(orc_writer->SortAndWriteAdjListTable(table, 0, 0).ok());
+    auto orc_fs = arrow::fs::FileSystemFromUriOrPath(
+                      "/tmp/option/edge/person_knows_person/ordered_by_source/"
+                      "adj_list/part0/chunk0")
+                      .ValueOrDie();
+    std::shared_ptr<arrow::io::RandomAccessFile> orc_input =
+        orc_fs
+            ->OpenInputFile(
+                "/tmp/option/edge/person_knows_person/ordered_by_source/"
+                "adj_list/part0/chunk0")
+            .ValueOrDie();
+    arrow::MemoryPool* pool = arrow::default_memory_pool();
+    std::unique_ptr<arrow::adapters::orc::ORCFileReader> orc_reader =
+        arrow::adapters::orc::ORCFileReader::Open(orc_input, 
pool).ValueOrDie();
+    auto maybe_orc_table = orc_reader->Read();
+    REQUIRE(maybe_orc_table.ok());
+    std::shared_ptr<arrow::Table> orc_table = *maybe_orc_table;
+    REQUIRE(orc_reader->GetCompression() == parquet::Compression::ZSTD);
+    REQUIRE(orc_table->num_rows() == table->num_rows());
+    REQUIRE(orc_table->num_columns() == table->num_columns());
+#endif
+  }
 }
 }  // namespace graphar
diff --git a/cpp/test/test_builder.cc b/cpp/test/test_builder.cc
index 63ebebc8..555a5669 100644
--- a/cpp/test/test_builder.cc
+++ b/cpp/test/test_builder.cc
@@ -37,7 +37,7 @@
 
 #include <catch2/catch_test_macros.hpp>
 namespace graphar {
-TEST_CASE_METHOD(GlobalFixture, "test_vertices_builder") {
+TEST_CASE_METHOD(GlobalFixture, "Test_vertices_builder") {
   std::cout << "Test vertex builder" << std::endl;
 
   // construct vertex builder
diff --git a/docs/libraries/cpp/getting-started.md 
b/docs/libraries/cpp/getting-started.md
index a9b9fa03..c0403a77 100644
--- a/docs/libraries/cpp/getting-started.md
+++ b/docs/libraries/cpp/getting-started.md
@@ -341,9 +341,11 @@ When you have the data ready, you can read the file into 
`arrow::Table` by using
 ```
 You can export label table to disk in parquet format, and read it back into 
memory in the following way.
 ``` cpp
+  auto options_parquet_builder = WriterOptions::ParquetOptionBuilder();
+  options_parquet_builder.compression(arrow::Compression::type::UNCOMPRESSED);
   // write arrow table as parquet chunk
   auto maybe_writer =
-      VertexPropertyWriter::Make(vertex_info, test_data_dir + 
"/ldbc/parquet/");
+      VertexPropertyWriter::Make(vertex_info, test_data_dir + 
"/ldbc/parquet/", options_parquet_builder.build());
   REQUIRE(!maybe_writer.has_error());
   auto writer = maybe_writer.value();
   REQUIRE(writer->WriteTable(table, 0).ok());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to