yangxk1 commented on code in PR #776:
URL: https://github.com/apache/incubator-graphar/pull/776#discussion_r2467717827


##########
cpp/extensions/kuzu-extension/src/utils/graphar_utils.cpp:
##########
@@ -0,0 +1,159 @@
+#include "utils/graphar_utils.h"
+
+namespace kuzu {
+namespace graphar_extension {
+
+std::string getFirstToken(const std::string& input) {
+    size_t pos = input.find('-');
+    if (pos == std::string::npos) {
+        return input;
+    }
+    return input.substr(0, pos);
+}
+
+void getYamlNameWithoutGrapharLabel(const std::string& filePath) {
+    const std::string grapharLabel = DEFAULT_GRAPHAR_LABEL;
+    if (ends_with(filePath, grapharLabel)) {
+        // remove the trailing ".graphar"
+        const_cast<std::string&>(filePath).erase(filePath.size() - 
grapharLabel.size());
+    }
+}
+
+bool ends_with(const std::string& s, const std::string& suffix) {
+    if (s.size() < suffix.size())
+        return false;
+    return s.compare(s.size() - suffix.size(), suffix.size(), suffix) == 0;
+}
+
+bool parse_is_edge(const std::string& path) {
+    if (path.empty()) {
+        throw std::invalid_argument("empty path");
+    }
+
+    // Extract the file name (substring after the last '/' or '\\')
+    const size_t pos = path.find_last_of("/\\");
+    const std::string filename = (pos == std::string::npos) ? path : 
path.substr(pos + 1);
+
+    if (filename.empty()) {
+        throw std::invalid_argument("path has no filename");
+    }
+
+    // Convert to uppercase for case-insensitive comparison
+    const std::string up = common::StringUtils::getUpper(filename);

Review Comment:
   suggestion: use more detailed variable names
   
   



##########
cpp/extensions/kuzu-extension/src/function/graphar_scan_bindfunc.cpp:
##########
@@ -0,0 +1,515 @@
+#include "function/graphar_scan.h"
+
+namespace kuzu {
+namespace graphar_extension {
+
+using namespace function;
+using namespace common;
+
+// Vertex setter maker for properties
+template<typename T>
+VertexColumnSetter makeTypedVertexSetter(uint64_t fieldIdx, std::string 
colName) {
+    return [fieldIdx, colName = std::move(colName)](graphar::VertexIter& it,
+               function::TableFuncOutput& output, kuzu::common::idx_t row) {
+        auto res = it.property<T>(colName);
+        auto& vec = output.dataChunk.getValueVectorMutable(fieldIdx);
+        vec.setValue(row, res.value());
+    };
+}
+
+// Vertex setter maker for internal_id
+VertexColumnSetter makeInternalIdVertexSetter(uint64_t fieldIdx, std::string 
colName) {
+    return [fieldIdx, colName = std::move(colName)](graphar::VertexIter& it,
+               function::TableFuncOutput& output, kuzu::common::idx_t row) {
+        auto id = it.id();
+        auto& vec = output.dataChunk.getValueVectorMutable(fieldIdx);
+        vec.setValue(row, static_cast<int64_t>(id));
+    };
+}
+
+template<>
+VertexColumnSetter makeTypedVertexSetter<list_entry_t>([[maybe_unused]] 
uint64_t fieldIdx,
+    [[maybe_unused]] std::string colName) {
+    throw NotImplementedException("List type is not supported in graphar 
scan.");
+}
+
+// Edge setter maker for properties
+template<typename T>
+EdgeColumnSetter makeTypedEdgeSetter(uint64_t fieldIdx, std::string colName) {
+    return [fieldIdx, colName = std::move(colName)](graphar::EdgeIter& it,
+               function::TableFuncOutput& output, kuzu::common::idx_t row,
+               [[maybe_unused]] std::shared_ptr<graphar::VerticesCollection> 
unused) {
+        auto res = it.property<T>(colName);
+        auto& vec = output.dataChunk.getValueVectorMutable(fieldIdx);
+        vec.setValue(row, res.value());
+    };
+}
+
+template<>
+EdgeColumnSetter makeTypedEdgeSetter<list_entry_t>([[maybe_unused]] uint64_t 
fieldIdx,
+    [[maybe_unused]] std::string colName) {
+    throw NotImplementedException("List type is not supported in graphar 
scan.");
+}
+
+// Edge setter for "from" (source) and "to" (destination)
+template<typename T>
+EdgeColumnSetter makeFromSetter(uint64_t fieldIdx, std::string colName) {
+    return [fieldIdx, colName = std::move(colName)](graphar::EdgeIter& it,
+               function::TableFuncOutput& output, kuzu::common::idx_t row,
+               std::shared_ptr<graphar::VerticesCollection> from_vertices) {
+        graphar::IdType src = it.source();
+        auto vertex_it = from_vertices->find(src);
+        auto res = vertex_it.property<T>(colName);
+        auto& vec = output.dataChunk.getValueVectorMutable(fieldIdx);
+        vec.setValue(row, res.value());
+    };
+}
+
+template<typename T>
+EdgeColumnSetter makeToSetter(uint64_t fieldIdx, std::string colName) {
+    return [fieldIdx, colName = std::move(colName)](graphar::EdgeIter& it,
+               function::TableFuncOutput& output, kuzu::common::idx_t row,
+               std::shared_ptr<graphar::VerticesCollection> to_vertices) {
+        graphar::IdType dst = it.destination();
+        auto vertex_it = to_vertices->find(dst);
+        auto res = vertex_it.property<T>(colName);
+        auto& vec = output.dataChunk.getValueVectorMutable(fieldIdx);
+        vec.setValue(row, res.value());
+    };
+}
+
+// Edge setter for "internal_from" (source) and "internal_to" (destination)
+EdgeColumnSetter makeInternalFromSetter(uint64_t fieldIdx) {
+    return
+        [fieldIdx](graphar::EdgeIter& it, function::TableFuncOutput& output,
+            kuzu::common::idx_t row, 
std::shared_ptr<graphar::VerticesCollection> from_vertices) {
+            graphar::IdType src = it.source();
+            auto vertex_it = from_vertices->find(src);
+            auto res = vertex_it.id();
+            auto& vec = output.dataChunk.getValueVectorMutable(fieldIdx);
+            vec.setValue(row, static_cast<int64_t>(res));
+        };
+}
+
+EdgeColumnSetter makeInternalToSetter(uint64_t fieldIdx) {
+    return [fieldIdx](graphar::EdgeIter& it, function::TableFuncOutput& output,
+               kuzu::common::idx_t row, 
std::shared_ptr<graphar::VerticesCollection> to_vertices) {
+        graphar::IdType dst = it.destination();
+        auto vertex_it = to_vertices->find(dst);
+        auto res = vertex_it.id();
+        auto& vec = output.dataChunk.getValueVectorMutable(fieldIdx);
+        vec.setValue(row, static_cast<int64_t>(res));
+    };
+}
+
+static const std::unordered_map<LogicalTypeID,
+    std::function<VertexColumnSetter(uint64_t, std::string)>>
+    vertexSetterFactory = {
+        {LogicalTypeID::INT64,
+            [](uint64_t idx, std::string col) {
+                return makeTypedVertexSetter<int64_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::INT32,
+            [](uint64_t idx, std::string col) {
+                return makeTypedVertexSetter<int32_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::DOUBLE,
+            [](uint64_t idx, std::string col) {
+                return makeTypedVertexSetter<double>(idx, std::move(col));
+            }},
+        {LogicalTypeID::FLOAT,
+            [](uint64_t idx, std::string col) {
+                return makeTypedVertexSetter<float>(idx, std::move(col));
+            }},
+        {LogicalTypeID::STRING,
+            [](uint64_t idx, std::string col) {
+                return makeTypedVertexSetter<std::string>(idx, std::move(col));
+            }},
+        {LogicalTypeID::BOOL,
+            [](uint64_t idx, std::string col) {
+                return makeTypedVertexSetter<bool>(idx, std::move(col));
+            }},
+        {LogicalTypeID::DATE,
+            [](uint64_t idx, std::string col) {
+                return makeTypedVertexSetter<date_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::TIMESTAMP,
+            [](uint64_t idx, std::string col) {
+                return makeTypedVertexSetter<timestamp_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::LIST,
+            [](uint64_t idx, std::string col) {
+                return makeTypedVertexSetter<list_entry_t>(idx, 
std::move(col));
+            }},
+};
+
+static const std::unordered_map<LogicalTypeID,
+    std::function<EdgeColumnSetter(uint64_t, std::string)>>
+    edgeSetterFactory = {
+        {LogicalTypeID::INT64,
+            [](uint64_t idx, std::string col) {
+                return makeTypedEdgeSetter<int64_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::INT32,
+            [](uint64_t idx, std::string col) {
+                return makeTypedEdgeSetter<int32_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::DOUBLE,
+            [](uint64_t idx, std::string col) {
+                return makeTypedEdgeSetter<double>(idx, std::move(col));
+            }},
+        {LogicalTypeID::FLOAT,
+            [](uint64_t idx, std::string col) {
+                return makeTypedEdgeSetter<float>(idx, std::move(col));
+            }},
+        {LogicalTypeID::STRING,
+            [](uint64_t idx, std::string col) {
+                return makeTypedEdgeSetter<std::string>(idx, std::move(col));
+            }},
+        {LogicalTypeID::BOOL,
+            [](uint64_t idx, std::string col) {
+                return makeTypedEdgeSetter<bool>(idx, std::move(col));
+            }},
+        {LogicalTypeID::DATE,
+            [](uint64_t idx, std::string col) {
+                return makeTypedEdgeSetter<date_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::TIMESTAMP,
+            [](uint64_t idx, std::string col) {
+                return makeTypedEdgeSetter<timestamp_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::LIST,
+            [](uint64_t idx, std::string col) {
+                return makeTypedEdgeSetter<list_entry_t>(idx, std::move(col));
+            }},
+};
+
+static const std::unordered_map<LogicalTypeID,
+    std::function<EdgeColumnSetter(uint64_t, std::string)>>
+    fromSetterFactory = {
+        {LogicalTypeID::INT64,
+            [](uint64_t idx, std::string col) {
+                return makeFromSetter<int64_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::INT32,
+            [](uint64_t idx, std::string col) {
+                return makeFromSetter<int32_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::DOUBLE,
+            [](uint64_t idx, std::string col) {
+                return makeFromSetter<double>(idx, std::move(col));
+            }},
+        {LogicalTypeID::FLOAT,
+            [](uint64_t idx, std::string col) {
+                return makeFromSetter<float>(idx, std::move(col));
+            }},
+        {LogicalTypeID::STRING,
+            [](uint64_t idx, std::string col) {
+                return makeFromSetter<std::string>(idx, std::move(col));
+            }},
+};
+
+static const std::unordered_map<LogicalTypeID,
+    std::function<EdgeColumnSetter(uint64_t, std::string)>>
+    toSetterFactory = {
+        {LogicalTypeID::INT64,
+            [](uint64_t idx, std::string col) {
+                return makeToSetter<int64_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::INT32,
+            [](uint64_t idx, std::string col) {
+                return makeToSetter<int32_t>(idx, std::move(col));
+            }},
+        {LogicalTypeID::DOUBLE,
+            [](uint64_t idx, std::string col) {
+                return makeToSetter<double>(idx, std::move(col));
+            }},
+        {LogicalTypeID::FLOAT,
+            [](uint64_t idx, std::string col) { return 
makeToSetter<float>(idx, std::move(col)); }},
+        {LogicalTypeID::STRING,
+            [](uint64_t idx, std::string col) {
+                return makeToSetter<std::string>(idx, std::move(col));
+            }},
+};
+
+static void autoDetectVertexSchema([[maybe_unused]] main::ClientContext* 
context,
+    std::shared_ptr<graphar::GraphInfo> graph_info, std::string table_name,
+    std::vector<LogicalType>& types, std::vector<std::string>& names) {
+    auto vertexInfo = graph_info->GetVertexInfo(table_name);
+    if (!vertexInfo) {
+        throw BinderException("GraphAr's VertexInfo " + table_name + " does 
not exist as vertex.");
+    }
+
+    names.push_back("internal_id");
+    types.push_back(LogicalType::INT64());
+
+    // Construct the types and names from the vertex info.
+    for (auto& property_group : vertexInfo->GetPropertyGroups()) {
+        for (const auto& property : property_group->GetProperties()) {
+            names.push_back(property.name);
+            types.push_back(grapharTypeToKuzuType(property.type));
+        }
+    }
+}
+
+static void autoDetectEdgeSchema([[maybe_unused]] main::ClientContext* context,
+    std::shared_ptr<graphar::GraphInfo> graph_info, const std::string& 
table_name,
+    std::vector<LogicalType>& types, std::vector<std::string>& names,
+    std::unordered_map<std::string, std::string>& edges_from_to_mapping) {
+    std::string src_type, edge_type, dst_type;
+    if (!tryParseEdgeTableName(table_name, src_type, edge_type, dst_type)) {
+        throw BinderException("Edge table_name must be specified in 
`src_edge_dst` format "
+                              "(supported separators: '_'). Given: " +
+                              table_name);
+    }
+
+    // Use GraphInfo to get EdgeInfo
+    auto edgeInfo = graph_info->GetEdgeInfo(src_type, edge_type, dst_type);
+    if (!edgeInfo) {
+        throw BinderException("GraphAr's EdgeInfo does not exist for " + 
table_name +
+                              " (parsed as " + src_type + REGULAR_SEPARATOR + 
edge_type +
+                              REGULAR_SEPARATOR + dst_type + ").");
+    }
+
+    auto from_vertex_info = graph_info->GetVertexInfo(src_type);
+    auto to_vertex_info = graph_info->GetVertexInfo(dst_type);
+    if (!from_vertex_info) {
+        throw BinderException(
+            "GraphAr's VertexInfo does not exist for edge source type " + 
src_type + ".");
+    }
+    if (!to_vertex_info) {
+        throw BinderException(
+            "GraphAr's VertexInfo does not exist for edge destination type " + 
dst_type + ".");
+    }
+
+    // Prepend from/to columns
+    names.push_back(FROM_COL_NAME);
+    // Find from column mapping name in the from_vertex_info's primary keys
+    for (const auto& propertyGroup : from_vertex_info->GetPropertyGroups()) {
+        for (const auto& property : propertyGroup->GetProperties()) {
+            if (property.is_primary) {
+                edges_from_to_mapping.insert({FROM_COL_NAME, property.name});
+                break;
+            }
+        }
+    }
+    types.push_back(LogicalType::INT64());
+
+    names.push_back(TO_COL_NAME);
+    // Find to column mapping name in the to_vertex_info's primary keys
+    for (const auto& propertyGroup : to_vertex_info->GetPropertyGroups()) {
+        for (const auto& property : propertyGroup->GetProperties()) {
+            if (property.is_primary) {
+                edges_from_to_mapping.insert({TO_COL_NAME, property.name});
+                break;
+            }
+        }
+    }
+    types.push_back(LogicalType::INT64());
+
+    // Add internal_from/internal_to edge columns (optional)
+    // internal_from/to is optional, because they are stored in internal_id of 
vertices.
+    // names.push_back(INTERNAL_FROM_COL_NAME);
+    // types.push_back(LogicalType::INT64());
+    // names.push_back(INTERNAL_TO_COL_NAME);
+    // types.push_back(LogicalType::INT64());
+
+    // Add the edge properties except from/to
+    for (auto& property_group : edgeInfo->GetPropertyGroups()) {
+        for (const auto& property : property_group->GetProperties()) {
+            names.push_back(property.name);
+            types.push_back(grapharTypeToKuzuType(property.type));
+        }
+    }
+}
+
+GrapharScanBindData::GrapharScanBindData(binder::expression_vector columns,
+    common::FileScanInfo fileScanInfo, main::ClientContext* context,
+    std::shared_ptr<graphar::GraphInfo> graph_info, std::string table_name,
+    std::vector<std::string> column_names, 
std::vector<kuzu::common::LogicalType> column_types,
+    bool is_edge)
+    : ScanFileBindData{std::move(columns), 0 /* numRows */, 
std::move(fileScanInfo), context},
+      graph_info{std::move(graph_info)},
+      column_info{std::make_shared<KuzuColumnInfo>(column_names)},
+      table_name{std::move(table_name)}, column_names{std::move(column_names)},
+      column_types{std::move(column_types)}, is_edge(is_edge) {
+    if (!is_edge) {
+        this->vertex_column_setters.reserve(this->column_names.size());
+        for (size_t i = 0; i < this->column_names.size(); ++i) {
+            if (StringUtils::caseInsensitiveEquals(this->column_names[i], 
"internal_id")) {
+                // special-case internal_id
+                
this->vertex_column_setters.push_back(makeInternalIdVertexSetter(
+                    getFieldIdx(this->column_names[i]), 
this->column_names[i]));
+                continue;
+            }
+            auto typeID = this->column_types[i].getLogicalTypeID();
+            uint64_t fieldIdx = getFieldIdx(this->column_names[i]);
+            KU_ASSERT(fieldIdx != UINT64_MAX);
+            auto it = vertexSetterFactory.find(typeID);
+            if (it == vertexSetterFactory.end()) {
+                throw NotImplementedException{
+                    "Unsupported column type in GrapharScan bind (vertex): " +
+                    std::to_string((int)typeID)};
+            }
+            this->vertex_column_setters.push_back(it->second(fieldIdx, 
this->column_names[i]));
+        }
+    } else {
+        this->edge_column_setters.reserve(this->column_names.size());
+        for (size_t i = 0; i < this->column_names.size(); ++i) {
+            uint64_t fieldIdx = getFieldIdx(this->column_names[i]);
+            KU_ASSERT(fieldIdx != UINT64_MAX);
+            // special-case from/to
+            if (StringUtils::caseInsensitiveEquals(this->column_names[i], 
FROM_COL_NAME)) {
+                LogicalTypeID typeID = 
this->column_types[i].getLogicalTypeID();
+                auto it = fromSetterFactory.find(typeID);
+                if (it == fromSetterFactory.end()) {
+                    throw NotImplementedException{
+                        "Unsupported column type in GrapharScan bind (from 
edge): " +
+                        std::to_string((int)typeID)};
+                }
+                this->edge_column_setters.push_back(it->second(fieldIdx, 
"id"));
+                continue;
+            }
+
+            if (StringUtils::caseInsensitiveEquals(this->column_names[i], 
TO_COL_NAME)) {
+                LogicalTypeID typeID = 
this->column_types[i].getLogicalTypeID();
+                auto it = toSetterFactory.find(typeID);
+                if (it == toSetterFactory.end()) {
+                    throw NotImplementedException{
+                        "Unsupported column type in GrapharScan bind (to 
edge): " +
+                        std::to_string((int)typeID)};
+                }
+                this->edge_column_setters.push_back(it->second(fieldIdx, 
"id"));
+                continue;
+            }
+
+            if (StringUtils::caseInsensitiveEquals(this->column_names[i], 
INTERNAL_FROM_COL_NAME)) {
+                // special-case internal_from
+                this->edge_column_setters.push_back(
+                    
makeInternalFromSetter(getFieldIdx(this->column_names[i])));
+                continue;
+            }
+
+            if (StringUtils::caseInsensitiveEquals(this->column_names[i], 
INTERNAL_TO_COL_NAME)) {
+                // special-case internal_to
+                this->edge_column_setters.push_back(
+                    makeInternalToSetter(getFieldIdx(this->column_names[i])));
+                continue;
+            }
+
+            // regular edge property
+            LogicalTypeID typeID = this->column_types[i].getLogicalTypeID();
+            auto it = edgeSetterFactory.find(typeID);
+            if (it == edgeSetterFactory.end()) {
+                throw NotImplementedException{
+                    "Unsupported column type in GrapharScan bind (edge): " +
+                    std::to_string((int)typeID)};
+            }
+            this->edge_column_setters.push_back(it->second(fieldIdx, 
this->column_names[i]));
+        }
+    }
+    this->max_threads = context->getMaxNumThreadForExec();
+}
+
+KuzuColumnInfo::KuzuColumnInfo(std::vector<std::string> column_names) {
+    this->colNames = std::move(column_names);
+    idx_t colIdx = 0;
+    for (auto& colName : this->colNames) {
+        colNameToIdx.insert({colName, colIdx++});
+    }
+}
+
+uint64_t KuzuColumnInfo::getFieldIdx(std::string fieldName) const {
+    // For a small number of keys, probing a vector is faster than lookups in 
an unordered_map.
+    if (colNames.size() < 24) {
+        auto iter = std::find(colNames.begin(), colNames.end(), fieldName);
+        if (iter != colNames.end()) {
+            return iter - colNames.begin();
+        }
+    } else {
+        auto itr = colNameToIdx.find(fieldName);
+        if (itr != colNameToIdx.end()) {
+            return itr->second;
+        }
+    }
+    // From and to are case-insensitive for backward compatibility.
+    if (StringUtils::caseInsensitiveEquals(fieldName, FROM_COL_NAME)) {
+        return colNameToIdx.at(FROM_COL_NAME);
+    } else if (StringUtils::caseInsensitiveEquals(fieldName, TO_COL_NAME)) {
+        return colNameToIdx.at(TO_COL_NAME);
+    }
+    return UINT64_MAX;
+}
+
+std::unique_ptr<TableFuncBindData> bindFunc(main::ClientContext* context,
+    const TableFuncBindInput* input) {
+    auto scanInput = 
ku_dynamic_cast<ExtraScanTableFuncBindInput*>(input->extraInput.get());
+    std::string absolute_path = scanInput->fileScanInfo.getFilePath(0);
+    if (absolute_path.empty()) {
+        throw BinderException("GraphAr scan requires a valid file path.");
+    }
+
+    std::string table_name = 
scanInput->fileScanInfo.options.at("table_name").strVal;
+
+    // Load graph info from the file path
+    auto graph_info = graphar::GraphInfo::Load(absolute_path).value();
+    if (!graph_info) {
+        throw BinderException("GraphAr's GraphInfo could not be loaded from " 
+ absolute_path);
+    }
+
+    std::vector<LogicalType> column_types;
+    std::vector<std::string> column_names;
+    std::unordered_map<std::string, std::string> edges_from_to_mapping;
+    bool is_edge = false;
+
+    auto vertex_infos = graph_info->GetVertexInfos();
+    auto edge_infos = graph_info->GetEdgeInfos();
+
+    // Try vertex first
+    for (const auto& v_info : vertex_infos) {
+        if (v_info->GetType() == table_name) {
+            autoDetectVertexSchema(context, graph_info, table_name, 
column_types, column_names);
+            is_edge = false;
+            goto TAIL;

Review Comment:
   Try not to use goto



##########
cpp/extensions/kuzu-extension/src/utils/graphar_utils.cpp:
##########
@@ -0,0 +1,159 @@
+#include "utils/graphar_utils.h"
+
+namespace kuzu {
+namespace graphar_extension {
+
+std::string getFirstToken(const std::string& input) {
+    size_t pos = input.find('-');
+    if (pos == std::string::npos) {
+        return input;
+    }
+    return input.substr(0, pos);
+}
+
+void getYamlNameWithoutGrapharLabel(const std::string& filePath) {
+    const std::string grapharLabel = DEFAULT_GRAPHAR_LABEL;
+    if (ends_with(filePath, grapharLabel)) {
+        // remove the trailing ".graphar"
+        const_cast<std::string&>(filePath).erase(filePath.size() - 
grapharLabel.size());

Review Comment:
   Returning a new `string` is safer than using `const_cast`



##########
cpp/extensions/kuzu-extension/src/function/graphar_export.cpp:
##########
@@ -0,0 +1,403 @@
+#include "function/graphar_export.h"
+
+using namespace kuzu::function;
+using namespace kuzu::common;
+
+namespace kuzu {
+namespace graphar_extension {
+
+void initSharedState(ExportFuncSharedState& sharedState, main::ClientContext& 
context,
+    const ExportFuncBindData& bindData) {
+    sharedState.init(context, bindData);
+}
+
+std::shared_ptr<ExportFuncSharedState> createSharedStateFunc() {
+    return std::make_shared<ExportGrapharSharedState>();
+}
+
+std::unique_ptr<ExportFuncLocalState> initLocalState(main::ClientContext&,
+    const ExportFuncBindData&, std::vector<bool>) {
+    return std::make_unique<ExportGrapharLocalState>();
+}
+
+void sinkFunc(ExportFuncSharedState&, ExportFuncLocalState& localState,
+    [[maybe_unused]] const ExportFuncBindData& bindData,
+    std::vector<std::shared_ptr<ValueVector>> inputVectors) {
+    auto& grapharLocalState = localState.cast<ExportGrapharLocalState>();
+
+    // ATTENTION: postpone buffer creation until first sink call, because we 
can't
+    // know the schema before that (It's hard for bindData to get the type 
info).
+    if (!grapharLocalState.buffer) {
+        // schema of input vectors
+        std::vector<PropMeta> schema_to_create;
+
+        // fill the schema and create buffer in local state.
+        KU_ASSERT(inputVectors.size() == bindData.columnNames.size());
+        for (size_t i = 0; i < inputVectors.size(); i++) {
+            schema_to_create.push_back(PropMeta{bindData.columnNames[i],
+                kuzuTypeToGrapharType(inputVectors[i]->dataType), 
Cardinality::SINGLE});
+        }
+
+        grapharLocalState.buffer = 
std::make_shared<WriteRowsBuffer>(std::move(schema_to_create));
+    }
+
+    auto& buffer = grapharLocalState.buffer;
+    const auto& schema = grapharLocalState.buffer->Schema();
+
+    if (inputVectors.size() != schema.size()) {
+        throw common::RuntimeException("inputVectors size != schema size");
+    }
+
+    // Compute the number of logical rows in the current batch (supporting 
mixed flat / unflat
+    // vectors) A flat vector is treated as selSize = 1 (broadcasted), while 
an unflat vector uses
+    // its own selSize.
+    size_t num_rows = 1;
+    for (size_t c = 0; c < inputVectors.size(); ++c) {
+        auto& v = inputVectors[c];
+        if (!v->state->isFlat()) {
+            auto s = 
static_cast<size_t>(v->state->getSelVector().getSelSize());
+            if (s > num_rows) {
+                num_rows = s;
+            }
+        }
+    }
+
+    // Optional: if prefer a fail-fast policy for inconsistent non-flat 
columns,
+    // you can disable the “max” strategy above and instead check that all 
non-flat
+    // columns have the same selSize, throwing an exception otherwise. Example:
+    //   size_t expected = 0;
+    //   for (...) if (!v->state->isFlat()) { if (expected==0) expected = s; 
else if (expected != s)
+    //   throw ...; }
+    // The current implementation adopts a pad-null strategy to improve 
robustness
+    // and compatibility.
+
+    for (size_t logicalRow = 0; logicalRow < num_rows; ++logicalRow) {
+        size_t rid = buffer->NewRow();
+
+        for (size_t col = 0; col < schema.size(); ++col) {
+            const auto& meta = schema[col];
+            auto& vecPtr = inputVectors[col];
+
+            // If the vector is flat: always use sel[0].
+            // If unflat: use sel[logicalRow] if logicalRow < selSize,
+            // otherwise treat as missing (null).
+            uint32_t physPos = 0;
+            if (vecPtr->state->isFlat()) {
+                physPos = vecPtr->state->getSelVector()[0];
+            } else {
+                auto selSize = 
static_cast<size_t>(vecPtr->state->getSelVector().getSelSize());
+                if (logicalRow >= selSize) {
+                    // pad-null: this column has no value for the current 
logicalRow,
+                    // leave as monostate (not written).
+                    continue;
+                }
+                physPos = vecPtr->state->getSelVector()[logicalRow];
+            }
+
+            // use physical position to check for null
+            if (vecPtr->isNull(physPos)) {
+                continue; // keep as monostate
+            }
+
+            switch (meta.type) {
+            case Type::INT64:
+            case Type::TIMESTAMP:
+                buffer->SetProperty(rid, meta.name, 
Scalar(vecPtr->getValue<int64_t>(physPos)));
+                break;
+            case Type::INT32:
+            case Type::DATE:
+                buffer->SetProperty(rid, meta.name, 
Scalar(vecPtr->getValue<int32_t>(physPos)));
+                break;
+            case Type::DOUBLE:
+                buffer->SetProperty(rid, meta.name, 
Scalar(vecPtr->getValue<double>(physPos)));
+                break;
+            case Type::FLOAT:
+                buffer->SetProperty(rid, meta.name, 
Scalar(vecPtr->getValue<float>(physPos)));
+                break;
+            case Type::STRING:
+                buffer->SetProperty(rid, meta.name,
+                    
Scalar(vecPtr->getValue<ku_string_t>(physPos).getAsString()));
+                break;
+            case Type::BOOL:
+                buffer->SetProperty(rid, meta.name, 
Scalar(vecPtr->getValue<bool>(physPos)));
+                break;
+            default:
+                throw common::RuntimeException(
+                    common::stringFormat("Unsupported type for property '{}'", 
meta.name));
+            }
+        }
+    }
+}
+
+void combineFunc(ExportFuncSharedState& sharedState, ExportFuncLocalState& 
localState) {
+    auto& grapharSharedState = sharedState.cast<ExportGrapharSharedState>();
+    auto& grapharLocalState = localState.cast<ExportGrapharLocalState>();
+
+    // nothing to combine if local buffer is null, just return.
+    // this can happen if sink haven't been called yet.
+    if (!grapharLocalState.buffer) {
+        return;
+    }
+
+    auto& buffer = grapharLocalState.buffer;
+    auto& schema = grapharLocalState.buffer->Schema();
+
+    // Helper: find index of a property by a list of candidate names 
(case-sensitive).
+    auto findIndexByCandidates =
+        [&schema](const std::vector<std::string>& candidates) -> 
std::optional<size_t> {
+        for (const auto& cand : candidates) {
+            for (size_t i = 0; i < schema.size(); ++i) {
+                if (schema[i].name == cand)
+                    return i;
+            }
+        }
+        return std::nullopt;
+    };
+
+    // If exporting edges, try to locate src/dst indices once before looping.
+    std::optional<size_t> srcIdxOpt, dstIdxOpt;
+    if (grapharSharedState.is_edge) {
+        srcIdxOpt = findIndexByCandidates(srcCandidates);
+        dstIdxOpt = findIndexByCandidates(dstCandidates);
+        if (!srcIdxOpt || !dstIdxOpt) {
+            // If not found, try a fallback: assume first two columns are 
src/dst
+            if (schema.size() >= 2) {
+                srcIdxOpt = 0;
+                dstIdxOpt = 1;
+            } else {
+                throw common::RuntimeException{common::stringFormat(
+                    "Cannot locate src/dst properties in schema for edge 
export.")};
+            }
+        }
+    }
+
+    std::lock_guard<std::mutex> lck{grapharSharedState.mtx};
+
+    // Iterate over rows and convert each row into a Vertex or Edge.
+    auto& rows = buffer->GetRows();
+    for (auto& row : rows) {
+        if (!grapharSharedState.is_edge) {
+            // Vertex export
+            graphar::builder::Vertex vertex;
+
+            // For each property in schema, if it's set, push to vertex.
+            for (size_t i = 0; i < schema.size(); ++i) {
+                const auto& meta = schema[i];
+                const auto& cell = row.props[i];
+
+                // Skip unset properties.
+                if (std::holds_alternative<std::monostate>(cell))
+                    continue;
+
+                if (meta.card == Cardinality::SINGLE) {
+                    // Single-valued property
+                    const Scalar& s = std::get<Scalar>(cell);
+                    std::visit(
+                        [&](auto&& val) {
+                            using T = std::decay_t<decltype(val)>;
+                            if constexpr (std::is_same_v<T, int64_t>) {
+                                vertex.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, int32_t>) {
+                                vertex.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, double>) {
+                                vertex.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, float>) {
+                                vertex.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, 
std::string>) {
+                                vertex.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, bool>) {
+                                vertex.AddProperty(meta.name, val);
+                            } else {
+                                // unreachable
+                            }
+                        },
+                        s);
+                } else {
+                    throw common::NotImplementedException{
+                        "GraphAr edge multi-valued properties are not 
implemented."};
+                }
+            }
+
+            // Add constructed vertex to verticesBuilder and check status.
+            if (!grapharSharedState.verticesBuilder) {
+                throw common::RuntimeException{"verticesBuilder is null."};
+            }
+            auto st_v = grapharSharedState.verticesBuilder->AddVertex(vertex);
+            if (!st_v.ok()) {
+                throw common::RuntimeException{
+                    common::stringFormat("AddVertex failed: {}", 
st_v.message())};
+            }
+
+        } else {
+            // Edge export
+            // retrieve src/dst indices (guaranteed present above)
+            size_t srcIdx = *srcIdxOpt;
+            size_t dstIdx = *dstIdxOpt;
+
+            // src/dst must be set and scalar
+            const auto& srcCell = row.props[srcIdx];
+            const auto& dstCell = row.props[dstIdx];
+            if (std::holds_alternative<std::monostate>(srcCell) ||
+                std::holds_alternative<std::monostate>(dstCell)) {
+                throw common::RuntimeException{"edge row missing src or dst 
value."};
+            }
+            if (!std::holds_alternative<Scalar>(srcCell) ||
+                !std::holds_alternative<Scalar>(dstCell)) {
+                throw common::RuntimeException{"edge src/dst must be scalar."};
+            }
+
+            // Extract src/dst as integer (prefer int64_t, allow int32_t)
+            int64_t srcId = 0, dstId = 0;
+            const Scalar& ssrc = std::get<Scalar>(srcCell);
+            const Scalar& sdst = std::get<Scalar>(dstCell);
+            // visitor for extracting integer id
+            auto extractId = [](const Scalar& sc) -> int64_t {
+                if (std::holds_alternative<int64_t>(sc))
+                    return std::get<int64_t>(sc);
+                if (std::holds_alternative<int32_t>(sc))
+                    return static_cast<int64_t>(std::get<int32_t>(sc));
+                // also allow strings that look like integers? Not by default: 
throw
+                throw common::RuntimeException{"edge endpoint is not integer 
type."};
+            };
+            srcId = extractId(ssrc);
+            dstId = extractId(sdst);
+
+            // construct edge
+            graphar::builder::Edge edge(srcId, dstId);
+
+            // add remaining properties except src/dst themselves
+            for (size_t i = 0; i < schema.size(); ++i) {
+                if (i == srcIdx || i == dstIdx)
+                    continue; // skip endpoints
+                const auto& meta = schema[i];
+                const auto& cell = row.props[i];
+                if (std::holds_alternative<std::monostate>(cell))
+                    continue;
+
+                if (meta.card == Cardinality::SINGLE) {
+                    const Scalar& s = std::get<Scalar>(cell);
+                    std::visit(
+                        [&](auto&& val) {
+                            using T = std::decay_t<decltype(val)>;
+                            if constexpr (std::is_same_v<T, int64_t>) {
+                                edge.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, int32_t>) {
+                                edge.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, double>) {
+                                edge.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, float>) {
+                                edge.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, 
std::string>) {
+                                edge.AddProperty(meta.name, val);
+                            } else if constexpr (std::is_same_v<T, bool>) {
+                                edge.AddProperty(meta.name, val);
+                            }
+                        },
+                        s);
+                } else {
+                    throw common::NotImplementedException{
+                        "GraphAr edge multi-valued properties are not 
implemented."};
+                }
+            }
+
+            // add edge to edgesBuilder and check status
+            if (!grapharSharedState.edgesBuilder) {
+                throw common::RuntimeException{"edgesBuilder is null."};
+            }
+            auto st_e = grapharSharedState.edgesBuilder->AddEdge(edge);
+            if (!st_e.ok()) {
+                throw common::RuntimeException{
+                    common::stringFormat("AddEdge failed: {}", 
st_e.message())};
+            }
+        }
+    }
+}
+
+void finalizeFunc(ExportFuncSharedState& sharedState) {
+    auto& grapharSharedState = sharedState.cast<ExportGrapharSharedState>();
+
+    if (!grapharSharedState.is_edge) {
+        if (!grapharSharedState.verticesBuilder) {
+            throw common::RuntimeException{"verticesBuilder is null in 
finalize."};
+        }
+        auto st = grapharSharedState.verticesBuilder->Dump();
+        if (!st.ok()) {
+            throw common::RuntimeException{
+                common::stringFormat("VerticesBuilder Finalize failed: {}", 
st.message())};
+        }
+        grapharSharedState.verticesBuilder->Clear();
+    } else {
+        if (!grapharSharedState.edgesBuilder) {
+            throw common::RuntimeException{"edgesBuilder is null in 
finalize."};
+        }
+        auto st = grapharSharedState.edgesBuilder->Dump();
+        if (!st.ok()) {
+            throw common::RuntimeException{
+                common::stringFormat("EdgesBuilder Finalize failed: {}", 
st.message())};
+        }
+        grapharSharedState.edgesBuilder->Clear();
+    }
+}
+
+void ExportGrapharSharedState::init([[maybe_unused]] main::ClientContext& 
context,
+    const ExportFuncBindData& bindData) {
+    const ExportGrapharBindData& grapharBindData = 
bindData.constCast<ExportGrapharBindData>();
+    std::shared_ptr<GraphInfo> graph_info = grapharBindData.graphInfo;
+    GrapharExportOptions exportOptions = grapharBindData.exportOptions;
+    std::string tableName = grapharBindData.tableName;
+    std::string targetDir = grapharBindData.targetDir;
+    ValidateLevel validateLevel = grapharBindData.validateLevel;
+
+    auto vertex_infos = graph_info->GetVertexInfos();
+    auto edge_infos = graph_info->GetEdgeInfos();
+
+    // Try vertex first
+    for (const auto& v_info : vertex_infos) {
+        if (v_info->GetType() == tableName) {
+            vertexInfo = v_info;
+            verticesBuilder = 
std::make_shared<builder::VerticesBuilder>(vertexInfo, targetDir, 0L,
+                exportOptions.wopt, validateLevel);
+            is_edge = false;
+            return;
+        }
+    }
+
+    // Try edge if not vertex
+    for (const auto& e_info : edge_infos) {
+        std::string src_type = e_info->GetSrcType();
+        std::string edge_type = e_info->GetEdgeType();
+        std::string dst_type = e_info->GetDstType();
+        std::string full_edge_name =
+            src_type + REGULAR_SEPARATOR + edge_type + REGULAR_SEPARATOR + 
dst_type;
+        if (full_edge_name == tableName) {
+            edgeInfo = e_info;
+            // edgesBuilder = 
std::make_shared<builder::EdgesBuilder>(edgeInfo, targetDir,
+            //     AdjListType::ordered_by_source, 903, exportOptions.wopt, 
validateLevel);
+            edgesBuilder = std::make_shared<builder::EdgesBuilder>(edgeInfo, 
targetDir,
+                AdjListType::ordered_by_source, 903);

Review Comment:
   Is 903 test data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to