This is an automated email from the ASF dual-hosted git repository.

weibin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-graphar.git


The following commit(s) were added to refs/heads/main by this push:
     new c4a5f72  [BugFix][C++] Cast to correct schema when get chunk with 
property reader (#456)
c4a5f72 is described below

commit c4a5f72bbf07b969d1a76f8dde9c29ac4c1d24eb
Author: Weibin Zeng <[email protected]>
AuthorDate: Fri Apr 19 15:24:02 2024 +0800

    [BugFix][C++] Cast to correct schema when get chunk with property reader 
(#456)
    
    ## Proposed changes
    as issue #219 describe, the chunk table get from arrow chunk reader may not 
has the same type with schema.
    This change help to fix the problem by cast the chunk table to what they 
should be(schema), if cast failed, raise error.
    
    ## Further comments
    fixes #219
    ---------
    
    Signed-off-by: acezen <[email protected]>
---
 cpp/include/gar/reader/arrow_chunk_reader.h |   3 +
 cpp/src/arrow_chunk_reader.cc               | 110 ++++++++++++++++++++++++++++
 cpp/src/filesystem.cc                       |   4 +-
 cpp/test/test_arrow_chunk_reader.cc         |  36 +++++++++
 4 files changed, 152 insertions(+), 1 deletion(-)

diff --git a/cpp/include/gar/reader/arrow_chunk_reader.h 
b/cpp/include/gar/reader/arrow_chunk_reader.h
index ca86628..908ed64 100644
--- a/cpp/include/gar/reader/arrow_chunk_reader.h
+++ b/cpp/include/gar/reader/arrow_chunk_reader.h
@@ -30,6 +30,7 @@
 // forward declaration
 namespace arrow {
 class Array;
+class Schema;
 class Table;
 }  // namespace arrow
 
@@ -145,6 +146,7 @@ class VertexPropertyArrowChunkReader {
   IdType seek_id_;
   IdType chunk_num_;
   IdType vertex_num_;
+  std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::Table> chunk_table_;
   util::FilterOptions filter_options_;
   std::shared_ptr<FileSystem> fs_;
@@ -500,6 +502,7 @@ class AdjListPropertyArrowChunkReader {
   std::string prefix_;
   IdType vertex_chunk_index_, chunk_index_;
   IdType seek_offset_;
+  std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::Table> chunk_table_;
   util::FilterOptions filter_options_;
   IdType vertex_chunk_num_, chunk_num_;
diff --git a/cpp/src/arrow_chunk_reader.cc b/cpp/src/arrow_chunk_reader.cc
index a70a66b..08360c7 100644
--- a/cpp/src/arrow_chunk_reader.cc
+++ b/cpp/src/arrow_chunk_reader.cc
@@ -18,12 +18,14 @@
  */
 
 #include "arrow/api.h"
+#include "arrow/compute/api.h"
 
 #include "gar/graph_info.h"
 #include "gar/reader/arrow_chunk_reader.h"
 #include "gar/util/adj_list_type.h"
 #include "gar/util/data_type.h"
 #include "gar/util/filesystem.h"
+#include "gar/util/general_params.h"
 #include "gar/util/reader_util.h"
 #include "gar/util/result.h"
 #include "gar/util/status.h"
@@ -31,6 +33,97 @@
 
 namespace graphar {
 
+namespace {
+
+Result<std::shared_ptr<arrow::Schema>> PropertyGroupToSchema(
+    const std::shared_ptr<PropertyGroup> pg,
+    bool contain_index_column = false) {
+  std::vector<std::shared_ptr<arrow::Field>> fields;
+  if (contain_index_column) {
+    fields.push_back(std::make_shared<arrow::Field>(
+        GeneralParams::kVertexIndexCol, arrow::int64()));
+  }
+  for (const auto& prop : pg->GetProperties()) {
+    fields.push_back(std::make_shared<arrow::Field>(
+        prop.name, DataType::DataTypeToArrowDataType(prop.type)));
+  }
+  return arrow::schema(fields);
+}
+
+Status GeneralCast(const std::shared_ptr<arrow::Array>& in,
+                   const std::shared_ptr<arrow::DataType>& to_type,
+                   std::shared_ptr<arrow::Array>* out) {
+  GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(*out,
+                                       arrow::compute::Cast(*in, to_type));
+  return Status::OK();
+}
+
+Status CastStringToLargeString(const std::shared_ptr<arrow::Array>& in,
+                               const std::shared_ptr<arrow::DataType>& to_type,
+                               std::shared_ptr<arrow::Array>* out) {
+  auto array_data = in->data()->Copy();
+  auto offset = array_data->buffers[1];
+  using from_offset_type = typename arrow::StringArray::offset_type;
+  using to_string_offset_type = typename arrow::LargeStringArray::offset_type;
+  auto raw_value_offsets_ =
+      offset == NULLPTR
+          ? NULLPTR
+          : reinterpret_cast<const from_offset_type*>(offset->data());
+  std::vector<to_string_offset_type> to_offset(offset->size() /
+                                               sizeof(from_offset_type));
+  for (size_t i = 0; i < to_offset.size(); ++i) {
+    to_offset[i] = raw_value_offsets_[i];
+  }
+  std::shared_ptr<arrow::Buffer> buffer;
+  arrow::TypedBufferBuilder<to_string_offset_type> buffer_builder;
+  RETURN_NOT_ARROW_OK(
+      buffer_builder.Append(to_offset.data(), to_offset.size()));
+  RETURN_NOT_ARROW_OK(buffer_builder.Finish(&buffer));
+  array_data->type = to_type;
+  array_data->buffers[1] = buffer;
+  *out = arrow::MakeArray(array_data);
+  RETURN_NOT_ARROW_OK((*out)->ValidateFull());
+  return Status::OK();
+}
+
+// helper function to cast arrow::Table with a schema
+Status CastTableWithSchema(const std::shared_ptr<arrow::Table>& table,
+                           const std::shared_ptr<arrow::Schema>& schema,
+                           std::shared_ptr<arrow::Table>* out_table) {
+  if (table->schema()->Equals(*schema)) {
+    *out_table = table;
+  }
+  std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
+  for (int64_t i = 0; i < table->num_columns(); ++i) {
+    auto column = table->column(i);
+    if (table->field(i)->type()->Equals(schema->field(i)->type())) {
+      columns.push_back(column);
+      continue;
+    }
+    auto from_t = table->field(i)->type();
+    auto to_t = schema->field(i)->type();
+    std::vector<std::shared_ptr<arrow::Array>> chunks;
+    // process cast for each chunk
+    for (int64_t j = 0; j < column->num_chunks(); ++j) {
+      auto chunk = column->chunk(j);
+      std::shared_ptr<arrow::Array> out;
+      if (arrow::compute::CanCast(*from_t, *to_t)) {
+        GAR_RETURN_NOT_OK(GeneralCast(chunk, to_t, &out));
+        chunks.push_back(out);
+      } else if (from_t->Equals(arrow::utf8()) &&
+                 to_t->Equals(arrow::large_utf8())) {
+        GAR_RETURN_NOT_OK(CastStringToLargeString(chunk, to_t, &out));
+        chunks.push_back(out);
+      }
+    }
+    columns.push_back(std::make_shared<arrow::ChunkedArray>(chunks, to_t));
+  }
+
+  *out_table = arrow::Table::Make(schema, columns);
+  return Status::OK();
+}
+}  // namespace
+
 VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
     const std::shared_ptr<VertexInfo>& vertex_info,
     const std::shared_ptr<PropertyGroup>& property_group,
@@ -39,6 +132,7 @@ 
VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
       property_group_(std::move(property_group)),
       chunk_index_(0),
       seek_id_(0),
+      schema_(nullptr),
       chunk_table_(nullptr),
       filter_options_(options) {
   GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_));
@@ -49,6 +143,8 @@ 
VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader(
                             util::GetVertexChunkNum(prefix_, vertex_info));
   GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_,
                             util::GetVertexNum(prefix_, vertex_info_));
+  GAR_ASSIGN_OR_RAISE_ERROR(schema_,
+                            PropertyGroupToSchema(property_group_, true));
 }
 
 Status VertexPropertyArrowChunkReader::seek(IdType id) {
@@ -79,6 +175,11 @@ VertexPropertyArrowChunkReader::GetChunk() {
     GAR_ASSIGN_OR_RAISE(
         chunk_table_, fs_->ReadFileToTable(path, 
property_group_->GetFileType(),
                                            filter_options_));
+    // TODO(acezen): filter pushdown doesn't support cast schema now
+    if (schema_ != nullptr && filter_options_.filter == nullptr) {
+      GAR_RETURN_NOT_OK(
+          CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
+    }
   }
   IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize();
   return chunk_table_->Slice(row_offset);
@@ -469,6 +570,7 @@ 
AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
       vertex_chunk_index_(0),
       chunk_index_(0),
       seek_offset_(0),
+      schema_(nullptr),
       chunk_table_(nullptr),
       filter_options_(options),
       chunk_num_(-1) /* -1 means uninitialized */ {
@@ -480,6 +582,8 @@ 
AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
   GAR_ASSIGN_OR_RAISE_ERROR(
       vertex_chunk_num_,
       util::GetVertexChunkNum(prefix_, edge_info_, adj_list_type_));
+  GAR_ASSIGN_OR_RAISE_ERROR(schema_,
+                            PropertyGroupToSchema(property_group, false));
 }
 
 AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
@@ -491,6 +595,7 @@ 
AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader(
       vertex_chunk_index_(other.vertex_chunk_index_),
       chunk_index_(other.chunk_index_),
       seek_offset_(other.seek_offset_),
+      schema_(other.schema_),
       chunk_table_(nullptr),
       filter_options_(other.filter_options_),
       vertex_chunk_num_(other.vertex_chunk_num_),
@@ -593,6 +698,11 @@ AdjListPropertyArrowChunkReader::GetChunk() {
     GAR_ASSIGN_OR_RAISE(
         chunk_table_, fs_->ReadFileToTable(path, 
property_group_->GetFileType(),
                                            filter_options_));
+    // TODO(acezen): filter pushdown doesn't support cast schema now
+    if (schema_ != nullptr && filter_options_.filter == nullptr) {
+      GAR_RETURN_NOT_OK(
+          CastTableWithSchema(chunk_table_, schema_, &chunk_table_));
+    }
   }
   IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize();
   return chunk_table_->Slice(row_offset);
diff --git a/cpp/src/filesystem.cc b/cpp/src/filesystem.cc
index b4d2042..bc76e73 100644
--- a/cpp/src/filesystem.cc
+++ b/cpp/src/filesystem.cc
@@ -283,7 +283,9 @@ Result<std::shared_ptr<FileSystem>> FileSystemFromUriOrPath(
         auto arrow_fs,
         arrow::fs::FileSystemFromUriOrPath(uri_string, out_path));
     // arrow would delete the last slash, so use uri string
-    *out_path = uri_string;
+    if (out_path != nullptr) {
+      *out_path = uri_string;
+    }
     return std::make_shared<FileSystem>(arrow_fs);
   }
 
diff --git a/cpp/test/test_arrow_chunk_reader.cc 
b/cpp/test/test_arrow_chunk_reader.cc
index 1113c1e..0ba5c73 100644
--- a/cpp/test/test_arrow_chunk_reader.cc
+++ b/cpp/test/test_arrow_chunk_reader.cc
@@ -24,7 +24,9 @@
 #include "./util.h"
 #include "gar/reader/arrow_chunk_reader.h"
 #include "gar/util/adj_list_type.h"
+#include "gar/util/data_type.h"
 #include "gar/util/expression.h"
+#include "gar/util/filesystem.h"
 #include "gar/util/general_params.h"
 
 #define CATCH_CONFIG_MAIN
@@ -96,6 +98,40 @@ TEST_CASE("ArrowChunkReader") {
       REQUIRE(reader->seek(1024).IsIndexError());
     }
 
+    SECTION("CastDataType") {
+      std::string prefix = root + "/modern_graph/";
+      std::string vertex_info_path = prefix + "person.vertex.yml";
+      std::cout << "Vertex info path: " << vertex_info_path << std::endl;
+      auto fs = FileSystemFromUriOrPath(prefix).value();
+      auto yaml_content =
+          fs->ReadFileToValue<std::string>(vertex_info_path).value();
+      std::cout << yaml_content << std::endl;
+      auto maybe_vertex_info = VertexInfo::Load(yaml_content);
+      REQUIRE(maybe_vertex_info.status().ok());
+      auto vertex_info = maybe_vertex_info.value();
+      std::cout << vertex_info->Dump().value() << std::endl;
+      auto pg = vertex_info->GetPropertyGroup("id");
+      REQUIRE(pg != nullptr);
+      REQUIRE(pg->GetProperties().size() == 1);
+      auto origin_property = pg->GetProperties()[0];
+      REQUIRE(origin_property.type->Equals(int64()));
+
+      // change to int32_t
+      Property new_property("id", int32(), origin_property.is_primary,
+                            origin_property.is_nullable);
+      auto new_pg = CreatePropertyGroup({new_property}, pg->GetFileType(),
+                                        pg->GetPrefix());
+      auto maybe_reader =
+          VertexPropertyArrowChunkReader::Make(vertex_info, new_pg, prefix);
+      REQUIRE(maybe_reader.status().ok());
+      auto reader = maybe_reader.value();
+      auto result = reader->GetChunk();
+      REQUIRE(!result.has_error());
+      auto table = result.value();
+      REQUIRE(table->schema()->GetFieldByName("id")->type()->id() ==
+              arrow::Type::INT32);
+    }
+
     SECTION("PropertyPushDown") {
       std::string filter_property = "gender";
       auto filter = _Equal(_Property(filter_property), _Literal("female"));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to