This is an automated email from the ASF dual-hosted git repository.

mtaha pushed a commit to branch PG17
in repository https://gitbox.apache.org/repos/asf/age.git

commit 827be72b77618f5c9fca6c8ef1bbc046dba04da2
Author: Muhammad Taha Naveed <[email protected]>
AuthorDate: Mon Jan 19 22:21:02 2026 +0500

    Replace libcsv with pg COPY for csv loading (#2310)
    
    - Commit also adds permission checks
    - Resolves a critical memory spike issue on loading large file
    - Use pg's COPY infrastructure (BeginCopyFrom, NextCopyFromRawFields)
      for 64KB buffered CSV parsing instead of libcsv
    - Add byte based flush threshold (64KB) matching COPY behavior for memory 
safety
    - Use heap_multi_insert with BulkInsertState for optimized batch inserts
    - Add per batch memory context to prevent memory growth during large loads
    - Remove libcsv dependency (libcsv.c, csv.h)
    - Improves loading performance by 15-20%
    - No previous regression tests were impacted
    - Added regression tests for permissions/rls
    Assisted-by AI
---
 Makefile                                |   1 -
 regress/expected/age_load.out           | 189 +++++++++++
 regress/expected/index.out              |  12 +-
 regress/sql/age_load.sql                | 125 ++++++++
 regress/sql/index.sql                   |   2 +-
 src/backend/utils/load/ag_load_edges.c  | 388 +++++++++++-----------
 src/backend/utils/load/ag_load_labels.c | 381 +++++++++++-----------
 src/backend/utils/load/age_load.c       | 248 +++++++++++++--
 src/backend/utils/load/libcsv.c         | 549 --------------------------------
 src/include/utils/load/ag_load_edges.h  |  52 ++-
 src/include/utils/load/ag_load_labels.h |  50 +--
 src/include/utils/load/age_load.h       |  27 +-
 src/include/utils/load/csv.h            | 108 -------
 13 files changed, 1000 insertions(+), 1132 deletions(-)

diff --git a/Makefile b/Makefile
index ffad7d6a..a8faa2bb 100644
--- a/Makefile
+++ b/Makefile
@@ -69,7 +69,6 @@ OBJS = src/backend/age.o \
        src/backend/utils/load/ag_load_labels.o \
        src/backend/utils/load/ag_load_edges.o \
        src/backend/utils/load/age_load.o \
-       src/backend/utils/load/libcsv.o \
        src/backend/utils/name_validation.o \
        src/backend/utils/ag_guc.o
 
diff --git a/regress/expected/age_load.out b/regress/expected/age_load.out
index 55d1ff1d..1f76c31c 100644
--- a/regress/expected/age_load.out
+++ b/regress/expected/age_load.out
@@ -454,6 +454,195 @@ NOTICE:  graph "agload_conversion" has been dropped
  
 (1 row)
 
+--
+-- Test security and permissions
+--
+SELECT create_graph('agload_security');
+NOTICE:  graph "agload_security" has been created
+ create_graph 
+--------------
+ 
+(1 row)
+
+SELECT create_vlabel('agload_security', 'Person1');
+NOTICE:  VLabel "Person1" has been created
+ create_vlabel 
+---------------
+ 
+(1 row)
+
+SELECT create_vlabel('agload_security', 'Person2');
+NOTICE:  VLabel "Person2" has been created
+ create_vlabel 
+---------------
+ 
+(1 row)
+
+SELECT create_elabel('agload_security', 'SecEdge');
+NOTICE:  ELabel "SecEdge" has been created
+ create_elabel 
+---------------
+ 
+(1 row)
+
+--
+-- Test 1: File read permission (pg_read_server_files role)
+--
+-- Create a user without pg_read_server_files role
+CREATE USER load_test_user;
+GRANT USAGE ON SCHEMA ag_catalog TO load_test_user;
+-- This should fail because load_test_user doesn't have pg_read_server_files
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+ERROR:  permission denied to LOAD from a file
+DETAIL:  Only roles with privileges of the "pg_read_server_files" role may 
LOAD from a file.
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+ERROR:  permission denied to LOAD from a file
+DETAIL:  Only roles with privileges of the "pg_read_server_files" role may 
LOAD from a file.
+RESET ROLE;
+-- Grant pg_read_server_files and try again - should fail on table permission 
now
+GRANT pg_read_server_files TO load_test_user;
+--
+-- Test 2: Table INSERT permission (ACL_INSERT)
+--
+-- User has file read permission but no INSERT on the label table
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+ERROR:  permission denied for table Person1
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+ERROR:  permission denied for table SecEdge
+RESET ROLE;
+-- Grant INSERT permission and try again - should succeed
+GRANT USAGE ON SCHEMA agload_security TO load_test_user;
+GRANT INSERT ON agload_security."Person1" TO load_test_user;
+GRANT INSERT ON agload_security."SecEdge" TO load_test_user;
+GRANT UPDATE ON SEQUENCE agload_security."Person1_id_seq" TO load_test_user;
+GRANT UPDATE ON SEQUENCE agload_security."SecEdge_id_seq" TO load_test_user;
+GRANT SELECT ON ag_catalog.ag_label TO load_test_user;
+GRANT SELECT ON ag_catalog.ag_graph TO load_test_user;
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+ load_labels_from_file 
+-----------------------
+ 
+(1 row)
+
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+ load_edges_from_file 
+----------------------
+ 
+(1 row)
+
+RESET ROLE;
+-- Verify data was loaded
+SELECT COUNT(*) FROM agload_security."Person1";
+ count 
+-------
+     6
+(1 row)
+
+SELECT COUNT(*) FROM agload_security."SecEdge";
+ count 
+-------
+     6
+(1 row)
+
+-- cleanup
+DELETE FROM agload_security."Person1";
+DELETE FROM agload_security."SecEdge";
+--
+-- Test 3: Row-Level Security (RLS)
+--
+-- Enable RLS on the label tables
+ALTER TABLE agload_security."Person1" ENABLE ROW LEVEL SECURITY;
+ALTER TABLE agload_security."SecEdge" ENABLE ROW LEVEL SECURITY;
+-- Switch to load_test_user
+SET ROLE load_test_user;
+-- Loading should fail when RLS is enabled
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+ERROR:  LOAD from file is not supported with row-level security
+HINT:  Use Cypher CREATE clause instead.
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+ERROR:  LOAD from file is not supported with row-level security
+HINT:  Use Cypher CREATE clause instead.
+RESET ROLE;
+-- Disable RLS and try again - should succeed
+ALTER TABLE agload_security."Person1" DISABLE ROW LEVEL SECURITY;
+ALTER TABLE agload_security."SecEdge" DISABLE ROW LEVEL SECURITY;
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+ load_labels_from_file 
+-----------------------
+ 
+(1 row)
+
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+ load_edges_from_file 
+----------------------
+ 
+(1 row)
+
+-- Verify data was loaded
+SELECT COUNT(*) FROM agload_security."Person1";
+ count 
+-------
+     6
+(1 row)
+
+SELECT COUNT(*) FROM agload_security."SecEdge";
+ count 
+-------
+     6
+(1 row)
+
+-- cleanup
+DELETE FROM agload_security."Person1";
+DELETE FROM agload_security."SecEdge";
+--
+-- Test 4: Constraint checking (CHECK constraint)
+--
+-- Add constraint on vertex properties - fail if bool property is false
+ALTER TABLE agload_security."Person1" ADD CONSTRAINT check_bool_true
+    CHECK ((properties->>'"bool"')::boolean = true);
+-- This should fail - constraint violation
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+ERROR:  new row for relation "Person1" violates check constraint 
"check_bool_true"
+DETAIL:  Failing row contains (844424930131970, {"id": "2", "bool": "false", 
"__id__": 2, "string": "John", "num...).
+-- Add constraint on edge properties - fail if bool property is false
+ALTER TABLE agload_security."SecEdge" ADD CONSTRAINT check_bool_true
+    CHECK ((properties->>'"bool"')::boolean = true);
+-- This should fail - some edges have bool = false
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+ERROR:  new row for relation "SecEdge" violates check constraint 
"check_bool_true"
+DETAIL:  Failing row contains (1407374883553294, 844424930131969, 
1125899906842625, {"bool": "false", "string": "John", "numeric": "-2"}).
+-- cleanup
+ALTER TABLE agload_security."Person1" DROP CONSTRAINT check_bool_true;
+ALTER TABLE agload_security."SecEdge" DROP CONSTRAINT check_bool_true;
+--
+-- Cleanup
+--
+REVOKE ALL ON agload_security."Person1" FROM load_test_user;
+REVOKE ALL ON agload_security."SecEdge" FROM load_test_user;
+REVOKE ALL ON SEQUENCE agload_security."Person1_id_seq" FROM load_test_user;
+REVOKE ALL ON SEQUENCE agload_security."SecEdge_id_seq" FROM load_test_user;
+REVOKE ALL ON ag_catalog.ag_label FROM load_test_user;
+REVOKE ALL ON ag_catalog.ag_graph FROM load_test_user;
+REVOKE ALL ON SCHEMA agload_security FROM load_test_user;
+REVOKE ALL ON SCHEMA ag_catalog FROM load_test_user;
+REVOKE pg_read_server_files FROM load_test_user;
+DROP USER load_test_user;
+SELECT drop_graph('agload_security', true);
+NOTICE:  drop cascades to 5 other objects
+DETAIL:  drop cascades to table agload_security._ag_label_vertex
+drop cascades to table agload_security._ag_label_edge
+drop cascades to table agload_security."Person1"
+drop cascades to table agload_security."Person2"
+drop cascades to table agload_security."SecEdge"
+NOTICE:  graph "agload_security" has been dropped
+ drop_graph 
+------------
+ 
+(1 row)
+
 --
 -- End
 --
diff --git a/regress/expected/index.out b/regress/expected/index.out
index 745cab26..ec62bf57 100644
--- a/regress/expected/index.out
+++ b/regress/expected/index.out
@@ -264,19 +264,19 @@ $$) as (n agtype);
 (0 rows)
 
 -- Verify that the incices are created on id columns
-SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index';
+SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index' 
ORDER BY 1;
           indexname          |                                            
indexdef                                            
 
-----------------------------+------------------------------------------------------------------------------------------------
+ City_pkey                   | CREATE UNIQUE INDEX "City_pkey" ON 
cypher_index."City" USING btree (id)
+ Country_pkey                | CREATE UNIQUE INDEX "Country_pkey" ON 
cypher_index."Country" USING btree (id)
+ _ag_label_edge_end_id_idx   | CREATE INDEX _ag_label_edge_end_id_idx ON 
cypher_index._ag_label_edge USING btree (end_id)
  _ag_label_edge_pkey         | CREATE UNIQUE INDEX _ag_label_edge_pkey ON 
cypher_index._ag_label_edge USING btree (id)
  _ag_label_edge_start_id_idx | CREATE INDEX _ag_label_edge_start_id_idx ON 
cypher_index._ag_label_edge USING btree (start_id)
- _ag_label_edge_end_id_idx   | CREATE INDEX _ag_label_edge_end_id_idx ON 
cypher_index._ag_label_edge USING btree (end_id)
  _ag_label_vertex_pkey       | CREATE UNIQUE INDEX _ag_label_vertex_pkey ON 
cypher_index._ag_label_vertex USING btree (id)
- idx_pkey                    | CREATE UNIQUE INDEX idx_pkey ON 
cypher_index.idx USING btree (id)
  cypher_index_idx_props_uq   | CREATE UNIQUE INDEX cypher_index_idx_props_uq 
ON cypher_index.idx USING btree (properties)
- Country_pkey                | CREATE UNIQUE INDEX "Country_pkey" ON 
cypher_index."Country" USING btree (id)
- has_city_start_id_idx       | CREATE INDEX has_city_start_id_idx ON 
cypher_index.has_city USING btree (start_id)
  has_city_end_id_idx         | CREATE INDEX has_city_end_id_idx ON 
cypher_index.has_city USING btree (end_id)
- City_pkey                   | CREATE UNIQUE INDEX "City_pkey" ON 
cypher_index."City" USING btree (id)
+ has_city_start_id_idx       | CREATE INDEX has_city_start_id_idx ON 
cypher_index.has_city USING btree (start_id)
+ idx_pkey                    | CREATE UNIQUE INDEX idx_pkey ON 
cypher_index.idx USING btree (id)
 (10 rows)
 
 SET enable_mergejoin = ON;
diff --git a/regress/sql/age_load.sql b/regress/sql/age_load.sql
index cefcfb4c..976f050a 100644
--- a/regress/sql/age_load.sql
+++ b/regress/sql/age_load.sql
@@ -194,6 +194,131 @@ SELECT load_edges_from_file('agload_conversion', 
'Edges1', '../../etc/passwd', t
 --
 SELECT drop_graph('agload_conversion', true);
 
+--
+-- Test security and permissions
+--
+
+SELECT create_graph('agload_security');
+SELECT create_vlabel('agload_security', 'Person1');
+SELECT create_vlabel('agload_security', 'Person2');
+SELECT create_elabel('agload_security', 'SecEdge');
+
+--
+-- Test 1: File read permission (pg_read_server_files role)
+--
+-- Create a user without pg_read_server_files role
+CREATE USER load_test_user;
+GRANT USAGE ON SCHEMA ag_catalog TO load_test_user;
+
+-- This should fail because load_test_user doesn't have pg_read_server_files
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+RESET ROLE;
+
+-- Grant pg_read_server_files and try again - should fail on table permission 
now
+GRANT pg_read_server_files TO load_test_user;
+
+--
+-- Test 2: Table INSERT permission (ACL_INSERT)
+--
+-- User has file read permission but no INSERT on the label table
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+RESET ROLE;
+
+-- Grant INSERT permission and try again - should succeed
+GRANT USAGE ON SCHEMA agload_security TO load_test_user;
+GRANT INSERT ON agload_security."Person1" TO load_test_user;
+GRANT INSERT ON agload_security."SecEdge" TO load_test_user;
+GRANT UPDATE ON SEQUENCE agload_security."Person1_id_seq" TO load_test_user;
+GRANT UPDATE ON SEQUENCE agload_security."SecEdge_id_seq" TO load_test_user;
+GRANT SELECT ON ag_catalog.ag_label TO load_test_user;
+GRANT SELECT ON ag_catalog.ag_graph TO load_test_user;
+
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+RESET ROLE;
+
+-- Verify data was loaded
+SELECT COUNT(*) FROM agload_security."Person1";
+SELECT COUNT(*) FROM agload_security."SecEdge";
+
+-- cleanup
+DELETE FROM agload_security."Person1";
+DELETE FROM agload_security."SecEdge";
+
+--
+-- Test 3: Row-Level Security (RLS)
+--
+
+-- Enable RLS on the label tables
+ALTER TABLE agload_security."Person1" ENABLE ROW LEVEL SECURITY;
+ALTER TABLE agload_security."SecEdge" ENABLE ROW LEVEL SECURITY;
+
+-- Switch to load_test_user
+SET ROLE load_test_user;
+
+-- Loading should fail when RLS is enabled
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+
+RESET ROLE;
+
+-- Disable RLS and try again - should succeed
+ALTER TABLE agload_security."Person1" DISABLE ROW LEVEL SECURITY;
+ALTER TABLE agload_security."SecEdge" DISABLE ROW LEVEL SECURITY;
+
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+
+-- Verify data was loaded
+SELECT COUNT(*) FROM agload_security."Person1";
+SELECT COUNT(*) FROM agload_security."SecEdge";
+
+-- cleanup
+DELETE FROM agload_security."Person1";
+DELETE FROM agload_security."SecEdge";
+
+--
+-- Test 4: Constraint checking (CHECK constraint)
+--
+
+-- Add constraint on vertex properties - fail if bool property is false
+ALTER TABLE agload_security."Person1" ADD CONSTRAINT check_bool_true
+    CHECK ((properties->>'"bool"')::boolean = true);
+
+-- This should fail - constraint violation
+SELECT load_labels_from_file('agload_security', 'Person1', 
'age_load/conversion_vertices.csv', true);
+
+-- Add constraint on edge properties - fail if bool property is false
+ALTER TABLE agload_security."SecEdge" ADD CONSTRAINT check_bool_true
+    CHECK ((properties->>'"bool"')::boolean = true);
+
+-- This should fail - some edges have bool = false
+SELECT load_edges_from_file('agload_security', 'SecEdge', 
'age_load/conversion_edges.csv');
+
+-- cleanup
+ALTER TABLE agload_security."Person1" DROP CONSTRAINT check_bool_true;
+ALTER TABLE agload_security."SecEdge" DROP CONSTRAINT check_bool_true;
+
+--
+-- Cleanup
+--
+REVOKE ALL ON agload_security."Person1" FROM load_test_user;
+REVOKE ALL ON agload_security."SecEdge" FROM load_test_user;
+REVOKE ALL ON SEQUENCE agload_security."Person1_id_seq" FROM load_test_user;
+REVOKE ALL ON SEQUENCE agload_security."SecEdge_id_seq" FROM load_test_user;
+REVOKE ALL ON ag_catalog.ag_label FROM load_test_user;
+REVOKE ALL ON ag_catalog.ag_graph FROM load_test_user;
+REVOKE ALL ON SCHEMA agload_security FROM load_test_user;
+REVOKE ALL ON SCHEMA ag_catalog FROM load_test_user;
+REVOKE pg_read_server_files FROM load_test_user;
+DROP USER load_test_user;
+SELECT drop_graph('agload_security', true);
+
 --
 -- End
 --
diff --git a/regress/sql/index.sql b/regress/sql/index.sql
index a6e075c7..d4a4b24a 100644
--- a/regress/sql/index.sql
+++ b/regress/sql/index.sql
@@ -165,7 +165,7 @@ SELECT * FROM cypher('cypher_index', $$
 $$) as (n agtype);
 
 -- Verify that the incices are created on id columns
-SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index';
+SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index' 
ORDER BY 1;
 
 SET enable_mergejoin = ON;
 SET enable_hashjoin = OFF;
diff --git a/src/backend/utils/load/ag_load_edges.c 
b/src/backend/utils/load/ag_load_edges.c
index 931c6e0d..c05bf335 100644
--- a/src/backend/utils/load/ag_load_edges.c
+++ b/src/backend/utils/load/ag_load_edges.c
@@ -16,50 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 #include "postgres.h"
 
-#include "utils/load/ag_load_edges.h"
-#include "utils/load/csv.h"
+#include "access/heapam.h"
+#include "access/table.h"
+#include "catalog/namespace.h"
+#include "commands/copy.h"
+#include "executor/executor.h"
+#include "nodes/makefuncs.h"
+#include "parser/parse_node.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
 
-void edge_field_cb(void *field, size_t field_len, void *data)
-{
-
-    csv_edge_reader *cr = (csv_edge_reader*)data;
-    if (cr->error)
-    {
-        cr->error = 1;
-        ereport(NOTICE,(errmsg("There is some unknown error")));
-    }
-
-    /* check for space to store this field */
-    if (cr->cur_field == cr->alloc)
-    {
-        cr->alloc *= 2;
-        cr->fields = repalloc_check(cr->fields, sizeof(char *) * cr->alloc);
-        cr->fields_len = repalloc_check(cr->header, sizeof(size_t *) * 
cr->alloc);
-        if (cr->fields == NULL)
-        {
-            cr->error = 1;
-            ereport(ERROR,
-                    (errmsg("field_cb: failed to reallocate %zu bytes\n",
-                            sizeof(char *) * cr->alloc)));
-        }
-    }
-    cr->fields_len[cr->cur_field] = field_len;
-    cr->curr_row_length += field_len;
-    cr->fields[cr->cur_field] = pnstrdup((char*)field, field_len);
-    cr->cur_field += 1;
-}
+#include "utils/load/ag_load_edges.h"
 
-/* Parser calls this function when it detects end of a row */
-void edge_row_cb(int delim __attribute__((unused)), void *data)
+/*
+ * Process a single edge row from COPY's raw fields.
+ * Edge CSV format: start_id, start_vertex_type, end_id, end_vertex_type, 
[properties...]
+ */
+static void process_edge_row(char **fields, int nfields,
+                             char **header, int header_count,
+                             int label_id, Oid label_seq_relid,
+                             Oid graph_oid, bool load_as_agtype,
+                             batch_insert_state *batch_state)
 {
-
-    csv_edge_reader *cr = (csv_edge_reader*)data;
-    batch_insert_state *batch_state = cr->batch_state;
-
-    size_t i, n_fields;
     int64 start_id_int;
     graphid start_vertex_graph_id;
     int start_vertex_type_id;
@@ -72,104 +52,92 @@ void edge_row_cb(int delim __attribute__((unused)), void 
*data)
     int64 entry_id;
     TupleTableSlot *slot;
 
-    n_fields = cr->cur_field;
+    char *start_vertex_type;
+    char *end_vertex_type;
+    agtype *edge_properties;
 
-    if (cr->row == 0)
-    {
-        cr->header_num = cr->cur_field;
-        cr->header_row_length = cr->curr_row_length;
-        cr->header_len = (size_t* )palloc(sizeof(size_t *) * cr->cur_field);
-        cr->header = palloc((sizeof (char*) * cr->cur_field));
+    /* Generate edge ID */
+    entry_id = nextval_internal(label_seq_relid, true);
+    edge_id = make_graphid(label_id, entry_id);
 
-        for (i = 0; i<cr->cur_field; i++)
-        {
-            cr->header_len[i] = cr->fields_len[i];
-            cr->header[i] = pnstrdup(cr->fields[i], cr->header_len[i]);
-        }
-    }
-    else
-    {
-        entry_id = nextval_internal(cr->label_seq_relid, true);
-        edge_id = make_graphid(cr->label_id, entry_id);
-
-        start_id_int = strtol(cr->fields[0], NULL, 10);
-        start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid);
-        end_id_int = strtol(cr->fields[2], NULL, 10);
-        end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_oid);
-
-        start_vertex_graph_id = make_graphid(start_vertex_type_id, 
start_id_int);
-        end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);
-
-        /* Get the appropriate slot from the batch state */
-        slot = batch_state->slots[batch_state->num_tuples];
-
-        /* Clear the slots contents */
-        ExecClearTuple(slot);
-
-        /* Fill the values in the slot */
-        slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id);
-        slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id);
-        slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id);
-        slot->tts_values[3] = AGTYPE_P_GET_DATUM(
-                                create_agtype_from_list_i(
-                                    cr->header, cr->fields,
-                                    n_fields, 4, cr->load_as_agtype));
-        slot->tts_isnull[0] = false;
-        slot->tts_isnull[1] = false;
-        slot->tts_isnull[2] = false;
-        slot->tts_isnull[3] = false;
-
-        /* Make the slot as containing virtual tuple */
-        ExecStoreVirtualTuple(slot);
-        batch_state->num_tuples++;
-
-        if (batch_state->num_tuples >= batch_state->max_tuples)
-        {
-            /* Insert the batch when it is full (i.e. BATCH_SIZE) */
-            insert_batch(batch_state);
-            batch_state->num_tuples = 0;
-        }
-    }
+    /* Trim whitespace from vertex type names */
+    start_vertex_type = trim_whitespace(fields[1]);
+    end_vertex_type = trim_whitespace(fields[3]);
 
-    for (i = 0; i < n_fields; ++i)
-    {
-        pfree_if_not_null(cr->fields[i]);
-    }
+    /* Parse start vertex info */
+    start_id_int = strtol(fields[0], NULL, 10);
+    start_vertex_type_id = get_label_id(start_vertex_type, graph_oid);
 
-    if (cr->error)
-    {
-        ereport(NOTICE,(errmsg("THere is some error")));
-    }
+    /* Parse end vertex info */
+    end_id_int = strtol(fields[2], NULL, 10);
+    end_vertex_type_id = get_label_id(end_vertex_type, graph_oid);
 
-    cr->cur_field = 0;
-    cr->curr_row_length = 0;
-    cr->row += 1;
-}
+    /* Create graphids for start and end vertices */
+    start_vertex_graph_id = make_graphid(start_vertex_type_id, start_id_int);
+    end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);
 
-static int is_space(unsigned char c)
-{
-    if (c == CSV_SPACE || c == CSV_TAB)
-    {
-        return 1;
-    }
-    else
+    /* Get the appropriate slot from the batch state */
+    slot = batch_state->slots[batch_state->num_tuples];
+
+    /* Clear the slots contents */
+    ExecClearTuple(slot);
+
+    /* Build the agtype properties */
+    edge_properties = create_agtype_from_list_i(header, fields,
+                                                nfields, 4, load_as_agtype);
+
+    /* Fill the values in the slot */
+    slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id);
+    slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id);
+    slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id);
+    slot->tts_values[3] = AGTYPE_P_GET_DATUM(edge_properties);
+    slot->tts_isnull[0] = false;
+    slot->tts_isnull[1] = false;
+    slot->tts_isnull[2] = false;
+    slot->tts_isnull[3] = false;
+
+    /* Make the slot as containing virtual tuple */
+    ExecStoreVirtualTuple(slot);
+
+    batch_state->buffered_bytes += VARSIZE(edge_properties);
+    batch_state->num_tuples++;
+
+    /* Insert the batch when tuple count OR byte threshold is reached */
+    if (batch_state->num_tuples >= BATCH_SIZE ||
+        batch_state->buffered_bytes >= MAX_BUFFERED_BYTES)
     {
-        return 0;
+        insert_batch(batch_state);
+        batch_state->num_tuples = 0;
+        batch_state->buffered_bytes = 0;
     }
 }
 
-static int is_term(unsigned char c)
+/*
+ * Create COPY options for CSV parsing.
+ * Returns a List of DefElem nodes.
+ */
+static List *create_copy_options(void)
 {
-    if (c == CSV_CR || c == CSV_LF)
-    {
-        return 1;
-    }
-    else
-    {
-        return 0;
-    }
+    List *options = NIL;
+
+    /* FORMAT csv */
+    options = lappend(options,
+                      makeDefElem("format",
+                                  (Node *) makeString("csv"),
+                                  -1));
+
+    /* HEADER false - we'll read the header ourselves */
+    options = lappend(options,
+                      makeDefElem("header",
+                                  (Node *) makeBoolean(false),
+                                  -1));
+
+    return options;
 }
 
+/*
+ * Load edges from CSV file using pg's COPY infrastructure.
+ */
 int create_edges_from_csv_file(char *file_path,
                                char *graph_name,
                                Oid graph_oid,
@@ -177,79 +145,133 @@ int create_edges_from_csv_file(char *file_path,
                                int label_id,
                                bool load_as_agtype)
 {
+    Relation        label_rel;
+    Oid             label_relid;
+    CopyFromState   cstate;
+    List           *copy_options;
+    ParseState     *pstate;
+    char          **fields;
+    int             nfields;
+    char          **header = NULL;
+    int             header_count = 0;
+    bool            is_first_row = true;
+    char           *label_seq_name;
+    Oid             label_seq_relid;
+    batch_insert_state *batch_state = NULL;
+    MemoryContext   batch_context;
+    MemoryContext   old_context;
+
+    /* Create a memory context for batch processing - reset after each batch */
+    batch_context = AllocSetContextCreate(CurrentMemoryContext,
+                                          "AGE CSV Edge Load Batch Context",
+                                          ALLOCSET_DEFAULT_SIZES);
+
+    /* Get the label relation */
+    label_relid = get_label_relation(label_name, graph_oid);
+    label_rel = table_open(label_relid, RowExclusiveLock);
+
+    /* Get sequence info */
+    label_seq_name = get_label_seq_relation_name(label_name);
+    label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
+
+    /* Initialize the batch insert state */
+    init_batch_insert(&batch_state, label_name, graph_oid);
+
+    /* Create COPY options for CSV parsing */
+    copy_options = create_copy_options();
+
+    /* Create a minimal ParseState for BeginCopyFrom */
+    pstate = make_parsestate(NULL);
 
-    FILE *fp;
-    struct csv_parser p;
-    char buf[1024];
-    size_t bytes_read;
-    unsigned char options = 0;
-    csv_edge_reader cr;
-    char *label_seq_name;
-
-    if (csv_init(&p, options) != 0)
+    PG_TRY();
     {
-        ereport(ERROR,
-                (errmsg("Failed to initialize csv parser\n")));
-    }
-
-    p.malloc_func = palloc;
-    p.realloc_func = repalloc_check;
-    p.free_func = pfree_if_not_null;
+        /*
+         * Initialize COPY FROM state.
+         * We pass the label relation but will only use NextCopyFromRawFields
+         * which returns raw parsed strings without type conversion.
+         */
+        cstate = BeginCopyFrom(pstate,
+                               label_rel,
+                               NULL,           /* whereClause */
+                               file_path,
+                               false,          /* is_program */
+                               NULL,           /* data_source_cb */
+                               NIL,            /* attnamelist */
+                               copy_options);
+
+        /*
+         * Process rows using COPY's csv parsing.
+         * NextCopyFromRawFields uses 64KB buffers internally.
+         */
+        while (NextCopyFromRawFields(cstate, &fields, &nfields))
+        {
+            if (is_first_row)
+            {
+                int i;
 
-    csv_set_space_func(&p, is_space);
-    csv_set_term_func(&p, is_term);
+                /* First row is the header - save column names (in main 
context) */
+                header_count = nfields;
+                header = (char **) palloc(sizeof(char *) * nfields);
 
-    fp = fopen(file_path, "rb");
-    if (!fp)
-    {
-        ereport(ERROR,
-                (errmsg("Failed to open %s\n", file_path)));
-    }
+                for (i = 0; i < nfields; i++)
+                {
+                    /* Trim whitespace from header fields */
+                    header[i] = trim_whitespace(fields[i]);
+                }
 
-    PG_TRY();
-    {
-        label_seq_name = get_label_seq_relation_name(label_name);
-
-        memset((void*)&cr, 0, sizeof(csv_edge_reader));
-        cr.alloc = 128;
-        cr.fields = palloc(sizeof(char *) * cr.alloc);
-        cr.fields_len = palloc(sizeof(size_t *) * cr.alloc);
-        cr.header_row_length = 0;
-        cr.curr_row_length = 0;
-        cr.graph_name = graph_name;
-        cr.graph_oid = graph_oid;
-        cr.label_name = label_name;
-        cr.label_id = label_id;
-        cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
-        cr.load_as_agtype = load_as_agtype;
-
-        /* Initialize the batch insert state */
-        init_batch_insert(&cr.batch_state, label_name, graph_oid);
-
-        while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
-        {
-            if (csv_parse(&p, buf, bytes_read, edge_field_cb,
-                        edge_row_cb, &cr) != bytes_read)
+                is_first_row = false;
+            }
+            else
             {
-                ereport(ERROR, (errmsg("Error while parsing file: %s\n",
-                                    csv_strerror(csv_error(&p)))));
+                /* Switch to batch context for row processing */
+                old_context = MemoryContextSwitchTo(batch_context);
+
+                /* Data row - process it */
+                process_edge_row(fields, nfields,
+                                 header, header_count,
+                                 label_id, label_seq_relid,
+                                 graph_oid, load_as_agtype,
+                                 batch_state);
+
+                /* Switch back to main context */
+                MemoryContextSwitchTo(old_context);
+
+                /* Reset batch context after each batch to free memory */
+                if (batch_state->num_tuples == 0)
+                {
+                    MemoryContextReset(batch_context);
+                }
             }
         }
 
-        csv_fini(&p, edge_field_cb, edge_row_cb, &cr);
-
         /* Finish any remaining batch inserts */
-        finish_batch_insert(&cr.batch_state);
+        finish_batch_insert(&batch_state);
+        MemoryContextReset(batch_context);
 
-        if (ferror(fp))
-        {
-            ereport(ERROR, (errmsg("Error while reading file %s\n", 
file_path)));
-        }
+        /* Clean up COPY state */
+        EndCopyFrom(cstate);
     }
     PG_FINALLY();
     {
-        fclose(fp);
-        csv_free(&p);
+        /* Free header if allocated */
+        if (header != NULL)
+        {
+            int i;
+            for (i = 0; i < header_count; i++)
+            {
+                pfree(header[i]);
+            }
+            pfree(header);
+        }
+
+        /* Close the relation */
+        table_close(label_rel, RowExclusiveLock);
+
+        /* Delete batch context */
+        MemoryContextDelete(batch_context);
+
+        /* Free parse state */
+        free_parsestate(pstate);
     }
     PG_END_TRY();
 
diff --git a/src/backend/utils/load/ag_load_labels.c 
b/src/backend/utils/load/ag_load_labels.c
index 1e86bbda..5b11f68b 100644
--- a/src/backend/utils/load/ag_load_labels.c
+++ b/src/backend/utils/load/ag_load_labels.c
@@ -17,155 +17,114 @@
  * under the License.
  */
 #include "postgres.h"
-#include "executor/spi.h"
+
+#include "access/heapam.h"
+#include "access/table.h"
 #include "catalog/namespace.h"
+#include "commands/copy.h"
 #include "executor/executor.h"
+#include "nodes/makefuncs.h"
+#include "parser/parse_node.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
 
 #include "utils/load/ag_load_labels.h"
-#include "utils/load/csv.h"
-
-void vertex_field_cb(void *field, size_t field_len, void *data)
-{
-
-    csv_vertex_reader *cr = (csv_vertex_reader *) data;
-
-    if (cr->error)
-    {
-        cr->error = 1;
-        ereport(NOTICE,(errmsg("There is some unknown error")));
-    }
-
-    /* check for space to store this field */
-    if (cr->cur_field == cr->alloc)
-    {
-        cr->alloc *= 2;
-        cr->fields = repalloc_check(cr->fields, sizeof(char *) * cr->alloc);
-        cr->fields_len = repalloc_check(cr->header, sizeof(size_t *) * 
cr->alloc);
-        if (cr->fields == NULL)
-        {
-            cr->error = 1;
-            ereport(ERROR,
-                    (errmsg("field_cb: failed to reallocate %zu bytes\n",
-                            sizeof(char *) * cr->alloc)));
-        }
-    }
-    cr->fields_len[cr->cur_field] = field_len;
-    cr->curr_row_length += field_len;
-    cr->fields[cr->cur_field] = pnstrdup((char *) field, field_len);
-    cr->cur_field += 1;
-}
 
-void vertex_row_cb(int delim __attribute__((unused)), void *data)
+/*
+ * Process a single vertex row from COPY's raw fields.
+ * Vertex CSV format: [id,] [properties...]
+ */
+static void process_vertex_row(char **fields, int nfields,
+                               char **header, int header_count,
+                               int label_id, Oid label_seq_relid,
+                               bool id_field_exists, bool load_as_agtype,
+                               int64 *curr_seq_num,
+                               batch_insert_state *batch_state)
 {
-    csv_vertex_reader *cr = (csv_vertex_reader*)data;
-    batch_insert_state *batch_state = cr->batch_state;
-    size_t i, n_fields;
     graphid vertex_id;
     int64 entry_id;
     TupleTableSlot *slot;
+    agtype *vertex_properties;
 
-    n_fields = cr->cur_field;
-
-    if (cr->row == 0)
+    /* Generate or use provided entry_id */
+    if (id_field_exists)
     {
-        cr->header_num = cr->cur_field;
-        cr->header_row_length = cr->curr_row_length;
-        cr->header_len = (size_t* )palloc(sizeof(size_t *) * cr->cur_field);
-        cr->header = palloc((sizeof (char*) * cr->cur_field));
-
-        for (i = 0; i<cr->cur_field; i++)
+        entry_id = strtol(fields[0], NULL, 10);
+        if (entry_id > *curr_seq_num)
         {
-            cr->header_len[i] = cr->fields_len[i];
-            cr->header[i] = pnstrdup(cr->fields[i], cr->header_len[i]);
+            /* This is needed to ensure the sequence is up-to-date */
+            DirectFunctionCall2(setval_oid,
+                                ObjectIdGetDatum(label_seq_relid),
+                                Int64GetDatum(entry_id));
+            *curr_seq_num = entry_id;
         }
     }
     else
     {
-        if (cr->id_field_exists)
-        {
-            entry_id = strtol(cr->fields[0], NULL, 10);
-            if (entry_id > cr->curr_seq_num)
-            {
-                DirectFunctionCall2(setval_oid,
-                                    ObjectIdGetDatum(cr->label_seq_relid),
-                                    Int64GetDatum(entry_id));
-                cr->curr_seq_num = entry_id;
-            }
-        }
-        else
-        {
-            entry_id = nextval_internal(cr->label_seq_relid, true);
-        }
+        entry_id = nextval_internal(label_seq_relid, true);
+    }
 
-        vertex_id = make_graphid(cr->label_id, entry_id);
+    vertex_id = make_graphid(label_id, entry_id);
 
-        /* Get the appropriate slot from the batch state */
-        slot = batch_state->slots[batch_state->num_tuples];
+    /* Get the appropriate slot from the batch state */
+    slot = batch_state->slots[batch_state->num_tuples];
 
-        /* Clear the slots contents */
-        ExecClearTuple(slot);
+    /* Clear the slots contents */
+    ExecClearTuple(slot);
 
-        /* Fill the values in the slot */
-        slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
-        slot->tts_values[1] = AGTYPE_P_GET_DATUM(
-                                create_agtype_from_list(cr->header, cr->fields,
-                                                        n_fields, entry_id,
-                                                        cr->load_as_agtype));
-        slot->tts_isnull[0] = false;
-        slot->tts_isnull[1] = false;
+    /* Build the agtype properties */
+    vertex_properties = create_agtype_from_list(header, fields,
+                                                nfields, entry_id,
+                                                load_as_agtype);
 
-        /* Make the slot as containing virtual tuple */
-        ExecStoreVirtualTuple(slot);
+    /* Fill the values in the slot */
+    slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
+    slot->tts_values[1] = AGTYPE_P_GET_DATUM(vertex_properties);
+    slot->tts_isnull[0] = false;
+    slot->tts_isnull[1] = false;
 
-        batch_state->num_tuples++;
+    /* Make the slot as containing virtual tuple */
+    ExecStoreVirtualTuple(slot);
 
-        if (batch_state->num_tuples >= batch_state->max_tuples)
-        {
-            /* Insert the batch when it is full (i.e. BATCH_SIZE) */
-            insert_batch(batch_state);
-            batch_state->num_tuples = 0;
-        }
-    }
+    batch_state->buffered_bytes += VARSIZE(vertex_properties);
+    batch_state->num_tuples++;
 
-    for (i = 0; i < n_fields; ++i)
+    /* Insert the batch when tuple count OR byte threshold is reached */
+    if (batch_state->num_tuples >= BATCH_SIZE ||
+        batch_state->buffered_bytes >= MAX_BUFFERED_BYTES)
     {
-        pfree_if_not_null(cr->fields[i]);
+        insert_batch(batch_state);
+        batch_state->num_tuples = 0;
+        batch_state->buffered_bytes = 0;
     }
-
-    if (cr->error)
-    {
-        ereport(NOTICE,(errmsg("THere is some error")));
-    }
-
-    cr->cur_field = 0;
-    cr->curr_row_length = 0;
-    cr->row += 1;
 }
 
-static int is_space(unsigned char c)
+/*
+ * Create COPY options for csv parsing.
+ * Returns a List of DefElem nodes.
+ */
+static List *create_copy_options(void)
 {
-    if (c == CSV_SPACE || c == CSV_TAB)
-    {
-        return 1;
-    }
-    else
-    {
-        return 0;
-    }
+    List *options = NIL;
 
-}
-static int is_term(unsigned char c)
-{
-    if (c == CSV_CR || c == CSV_LF)
-    {
-        return 1;
-    }
-    else
-    {
-        return 0;
-    }
+    /* FORMAT csv */
+    options = lappend(options,
+                      makeDefElem("format",
+                                  (Node *) makeString("csv"),
+                                  -1));
+
+    /* HEADER false - we'll read the header ourselves */
+    options = lappend(options,
+                      makeDefElem("header",
+                                  (Node *) makeBoolean(false),
+                                  -1));
+
+    return options;
 }
 
+/*
+ * Load vertex labels from csv file using pg's COPY infrastructure.
+ */
 int create_labels_from_csv_file(char *file_path,
                                 char *graph_name,
                                 Oid graph_oid,
@@ -174,96 +133,146 @@ int create_labels_from_csv_file(char *file_path,
                                 bool id_field_exists,
                                 bool load_as_agtype)
 {
-
-    FILE *fp;
-    struct csv_parser p;
-    char buf[1024];
-    size_t bytes_read;
-    unsigned char options = 0;
-    csv_vertex_reader cr;
-    char *label_seq_name;
-
-    if (csv_init(&p, options) != 0)
+    Relation        label_rel;
+    Oid             label_relid;
+    CopyFromState   cstate;
+    List           *copy_options;
+    ParseState     *pstate;
+    char          **fields;
+    int             nfields;
+    char          **header = NULL;
+    int             header_count = 0;
+    bool            is_first_row = true;
+    char           *label_seq_name;
+    Oid             label_seq_relid;
+    int64           curr_seq_num = 0;
+    batch_insert_state *batch_state = NULL;
+    MemoryContext   batch_context;
+    MemoryContext   old_context;
+
+    /* Create a memory context for batch processing - reset after each batch */
+    batch_context = AllocSetContextCreate(CurrentMemoryContext,
+                                          "AGE CSV Load Batch Context",
+                                          ALLOCSET_DEFAULT_SIZES);
+
+    /* Get the label relation */
+    label_relid = get_label_relation(label_name, graph_oid);
+    label_rel = table_open(label_relid, RowExclusiveLock);
+
+    /* Get sequence info */
+    label_seq_name = get_label_seq_relation_name(label_name);
+    label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
+
+    if (id_field_exists)
     {
-        ereport(ERROR,
-                (errmsg("Failed to initialize csv parser\n")));
+        /*
+         * Set the curr_seq_num since we will need it to compare with
+         * incoming entry_id.
+         */
+        curr_seq_num = nextval_internal(label_seq_relid, true);
     }
 
-    p.malloc_func = palloc;
-    p.realloc_func = repalloc_check;
-    p.free_func = pfree_if_not_null;
+    /* Initialize the batch insert state */
+    init_batch_insert(&batch_state, label_name, graph_oid);
 
-    csv_set_space_func(&p, is_space);
-    csv_set_term_func(&p, is_term);
+    /* Create COPY options for CSV parsing */
+    copy_options = create_copy_options();
 
-    fp = fopen(file_path, "rb");
-    if (!fp)
-    {
-        ereport(ERROR,
-                (errmsg("Failed to open %s\n", file_path)));
-    }
+    /* Create a minimal ParseState for BeginCopyFrom */
+    pstate = make_parsestate(NULL);
 
     PG_TRY();
     {
-        label_seq_name = get_label_seq_relation_name(label_name);
-
-        memset((void*)&cr, 0, sizeof(csv_vertex_reader));
-
-        cr.alloc = 2048;
-        cr.fields = palloc(sizeof(char *) * cr.alloc);
-        cr.fields_len = palloc(sizeof(size_t *) * cr.alloc);
-        cr.header_row_length = 0;
-        cr.curr_row_length = 0;
-        cr.graph_name = graph_name;
-        cr.graph_oid = graph_oid;
-        cr.label_name = label_name;
-        cr.label_id = label_id;
-        cr.id_field_exists = id_field_exists;
-        cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
-        cr.load_as_agtype = load_as_agtype;
-            
-        if (cr.id_field_exists)
+        /*
+         * Initialize COPY FROM state.
+         * We pass the label relation but will only use NextCopyFromRawFields
+         * which returns raw parsed strings without type conversion.
+         */
+        cstate = BeginCopyFrom(pstate,
+                               label_rel,
+                               NULL,           /* whereClause */
+                               file_path,
+                               false,          /* is_program */
+                               NULL,           /* data_source_cb */
+                               NIL,            /* attnamelist - NULL means all 
columns */
+                               copy_options);
+
+        /*
+         * Process rows using COPY's csv parsing.
+         * NextCopyFromRawFields uses 64KB buffers internally.
+         */
+        while (NextCopyFromRawFields(cstate, &fields, &nfields))
         {
-            /*
-            * Set the curr_seq_num since we will need it to compare with
-            * incoming entry_id.
-            * 
-            * We cant use currval because it will error out if nextval was
-            * not called before in the session.
-            */
-            cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true);
-        }
+            if (is_first_row)
+            {
+                int i;
 
-        /* Initialize the batch insert state */
-        init_batch_insert(&cr.batch_state, label_name, graph_oid);
+                /* First row is the header - save column names (in main 
context) */
+                header_count = nfields;
+                header = (char **) palloc(sizeof(char *) * nfields);
 
-        while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
-        {
-            if (csv_parse(&p, buf, bytes_read, vertex_field_cb,
-                        vertex_row_cb, &cr) != bytes_read)
+                for (i = 0; i < nfields; i++)
+                {
+                    /* Trim whitespace from header fields */
+                    header[i] = trim_whitespace(fields[i]);
+                }
+
+                is_first_row = false;
+            }
+            else
             {
-                ereport(ERROR, (errmsg("Error while parsing file: %s\n",
-                                    csv_strerror(csv_error(&p)))));
+                /* Switch to batch context for row processing */
+                old_context = MemoryContextSwitchTo(batch_context);
+
+                /* Data row - process it */
+                process_vertex_row(fields, nfields,
+                                   header, header_count,
+                                   label_id, label_seq_relid,
+                                   id_field_exists, load_as_agtype,
+                                   &curr_seq_num,
+                                   batch_state);
+
+                /* Switch back to main context */
+                MemoryContextSwitchTo(old_context);
+
+                /* Reset batch context after each batch to free memory */
+                if (batch_state->num_tuples == 0)
+                {
+                    MemoryContextReset(batch_context);
+                }
             }
         }
 
-        csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr);
-
         /* Finish any remaining batch inserts */
-        finish_batch_insert(&cr.batch_state);
+        finish_batch_insert(&batch_state);
+        MemoryContextReset(batch_context);
 
-        if (ferror(fp))
-        {
-            ereport(ERROR, (errmsg("Error while reading file %s\n",
-                                file_path)));
-        }
+        /* Clean up COPY state */
+        EndCopyFrom(cstate);
     }
     PG_FINALLY();
     {
-        fclose(fp);
-        csv_free(&p);
+        /* Free header if allocated */
+        if (header != NULL)
+        {
+            int i;
+            for (i = 0; i < header_count; i++)
+            {
+                pfree(header[i]);
+            }
+            pfree(header);
+        }
+
+        /* Close the relation */
+        table_close(label_rel, RowExclusiveLock);
+
+        /* Delete batch context */
+        MemoryContextDelete(batch_context);
+
+        /* Free parse state */
+        free_parsestate(pstate);
     }
     PG_END_TRY();
 
     return EXIT_SUCCESS;
-}
\ No newline at end of file
+}
diff --git a/src/backend/utils/load/age_load.c 
b/src/backend/utils/load/age_load.c
index c7cf0677..f9634668 100644
--- a/src/backend/utils/load/age_load.c
+++ b/src/backend/utils/load/age_load.c
@@ -18,24 +18,81 @@
  */
 
 #include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/table.h"
+#include "access/tableam.h"
+#include "access/xact.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_authid.h"
 #include "executor/executor.h"
+#include "miscadmin.h"
+#include "nodes/parsenodes.h"
+#include "parser/parse_relation.h"
+#include "utils/acl.h"
 #include "utils/json.h"
+#include "utils/rel.h"
+#include "utils/rls.h"
 
 #include "utils/load/ag_load_edges.h"
 #include "utils/load/ag_load_labels.h"
 #include "utils/load/age_load.h"
-#include "utils/rel.h"
 
 static agtype_value *csv_value_to_agtype_value(char *csv_val);
 static Oid get_or_create_graph(const Name graph_name);
 static int32 get_or_create_label(Oid graph_oid, char *graph_name,
                                  char *label_name, char label_kind);
 static char *build_safe_filename(char *name);
+static void check_file_read_permission(void);
+static void check_table_permissions(Oid relid);
+static void check_rls_for_load(Oid relid);
 
 #define AGE_BASE_CSV_DIRECTORY "/tmp/age/"
 #define AGE_CSV_FILE_EXTENSION ".csv"
 
+/*
+ * Trim leading and trailing whitespace from a string.
+ * Returns a newly allocated string with whitespace removed.
+ * Returns empty string for NULL input.
+ */
+char *trim_whitespace(const char *str)
+{
+    const char *start;
+    const char *end;
+    size_t len;
+
+    if (str == NULL)
+    {
+        return pstrdup("");
+    }
+
+    /* Find first non-whitespace character */
+    start = str;
+    while (*start && (*start == ' ' || *start == '\t' ||
+                      *start == '\n' || *start == '\r'))
+    {
+        start++;
+    }
+
+    /* If string is all whitespace, return empty string */
+    if (*start == '\0')
+    {
+        return pstrdup("");
+    }
+
+    /* Find last non-whitespace character */
+    end = str + strlen(str) - 1;
+    while (end > start && (*end == ' ' || *end == '\t' ||
+                           *end == '\n' || *end == '\r'))
+    {
+        end--;
+    }
+
+    /* Copy the trimmed string */
+    len = end - start + 1;
+    return pnstrdup(start, len);
+}
+
 static char *build_safe_filename(char *name)
 {
     int length;
@@ -88,6 +145,51 @@ static char *build_safe_filename(char *name)
     return resolved;
 }
 
+/*
+ * Check if the current user has permission to read server files.
+ * Only users with the pg_read_server_files role can load from files.
+ */
+static void check_file_read_permission(void)
+{
+    if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES))
+    {
+        ereport(ERROR,
+                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                 errmsg("permission denied to LOAD from a file"),
+                 errdetail("Only roles with privileges of the \"%s\" role may 
LOAD from a file.",
+                           "pg_read_server_files")));
+    }
+}
+
+/*
+ * Check if the current user has INSERT permission on the target table.
+ */
+static void check_table_permissions(Oid relid)
+{
+    AclResult aclresult;
+
+    aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_INSERT);
+    if (aclresult != ACLCHECK_OK)
+    {
+        aclcheck_error(aclresult, OBJECT_TABLE, get_rel_name(relid));
+    }
+}
+
+/*
+ * Check if RLS is enabled on the target table.
+ * CSV loading is not supported with row-level security.
+ */
+static void check_rls_for_load(Oid relid)
+{
+    if (check_enable_rls(relid, InvalidOid, true) == RLS_ENABLED)
+    {
+        ereport(ERROR,
+                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                 errmsg("LOAD from file is not supported with row-level 
security"),
+                 errhint("Use Cypher CREATE clause instead.")));
+    }
+}
+
 agtype *create_empty_agtype(void)
 {
     agtype* out;
@@ -118,6 +220,14 @@ static agtype_value *csv_value_to_agtype_value(char 
*csv_val)
     char *new_csv_val;
     agtype_value *res;
 
+    /* Handle NULL or empty input - return null agtype value */
+    if (csv_val == NULL || csv_val[0] == '\0')
+    {
+        res = palloc(sizeof(agtype_value));
+        res->type = AGTV_NULL;
+        return res;
+    }
+
     if (!json_validate(cstring_to_text(csv_val), false, false))
     {
         /* wrap the string with double-quote */
@@ -175,18 +285,40 @@ agtype *create_agtype_from_list(char **header, char 
**fields, size_t fields_len,
 
     for (i = 0; i<fields_len; i++)
     {
+        char *trimmed_value;
+
+        /* Skip empty header fields (e.g., from trailing commas) */
+        if (header[i] == NULL || header[i][0] == '\0')
+        {
+            continue;
+        }
+
         key_agtype = string_to_agtype_value(header[i]);
         result.res = push_agtype_value(&result.parse_state,
                                        WAGT_KEY,
                                        key_agtype);
 
+        /* Trim whitespace from field value */
+        trimmed_value = trim_whitespace(fields[i]);
+
         if (load_as_agtype)
         {
-            value_agtype = csv_value_to_agtype_value(fields[i]);
+            value_agtype = csv_value_to_agtype_value(trimmed_value);
         }
         else
         {
-            value_agtype = string_to_agtype_value(fields[i]);
+            /* Handle empty field values */
+            if (trimmed_value[0] == '\0')
+            {
+                value_agtype = palloc(sizeof(agtype_value));
+                value_agtype->type = AGTV_STRING;
+                value_agtype->val.string.len = 0;
+                value_agtype->val.string.val = pstrdup("");
+            }
+            else
+            {
+                value_agtype = string_to_agtype_value(trimmed_value);
+            }
         }
 
         result.res = push_agtype_value(&result.parse_state,
@@ -228,18 +360,40 @@ agtype* create_agtype_from_list_i(char **header, char 
**fields,
 
     for (i = start_index; i < fields_len; i++)
     {
+        char *trimmed_value;
+
+        /* Skip empty header fields (e.g., from trailing commas) */
+        if (header[i] == NULL || header[i][0] == '\0')
+        {
+            continue;
+        }
+
         key_agtype = string_to_agtype_value(header[i]);
         result.res = push_agtype_value(&result.parse_state,
                                        WAGT_KEY,
                                        key_agtype);
 
+        /* Trim whitespace from field value */
+        trimmed_value = trim_whitespace(fields[i]);
+
         if (load_as_agtype)
         {
-            value_agtype = csv_value_to_agtype_value(fields[i]);
+            value_agtype = csv_value_to_agtype_value(trimmed_value);
         }
         else
         {
-            value_agtype = string_to_agtype_value(fields[i]);
+            /* Handle empty field values */
+            if (trimmed_value[0] == '\0')
+            {
+                value_agtype = palloc(sizeof(agtype_value));
+                value_agtype->type = AGTV_STRING;
+                value_agtype->val.string.len = 0;
+                value_agtype->val.string.val = pstrdup("");
+            }
+            else
+            {
+                value_agtype = string_to_agtype_value(trimmed_value);
+            }
         }
 
         result.res = push_agtype_value(&result.parse_state,
@@ -362,11 +516,24 @@ void insert_batch(batch_insert_state *batch_state)
     List *result;
     int i;
 
+    /* Check constraints for each tuple before inserting */
+    if (batch_state->resultRelInfo->ri_RelationDesc->rd_att->constr)
+    {
+        for (i = 0; i < batch_state->num_tuples; i++)
+        {
+            ExecConstraints(batch_state->resultRelInfo,
+                            batch_state->slots[i],
+                            batch_state->estate);
+        }
+    }
+
     /* Insert the tuples */
     heap_multi_insert(batch_state->resultRelInfo->ri_RelationDesc,
                       batch_state->slots, batch_state->num_tuples,
-                      GetCurrentCommandId(true), 0, NULL);
-    
+                      GetCurrentCommandId(true),
+                      TABLE_INSERT_SKIP_FSM,  /* Skip free space map for bulk 
*/
+                      batch_state->bistate);  /* Use bulk insert state */
+
     /* Insert index entries for the tuples */
     if (batch_state->resultRelInfo->ri_NumIndices > 0)
     {
@@ -405,6 +572,7 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
     char* label_name_str;
     char* file_path_str;
     Oid graph_oid;
+    Oid label_relid;
     int32 label_id;
     bool id_field_exists;
     bool load_as_agtype;
@@ -427,6 +595,9 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
                 errmsg("file path must not be NULL")));
     }
 
+    /* Check file read permission first */
+    check_file_read_permission();
+
     graph_name = PG_GETARG_NAME(0);
     label_name = PG_GETARG_NAME(1);
     file_name = PG_GETARG_TEXT_P(2);
@@ -447,6 +618,11 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
     label_id = get_or_create_label(graph_oid, graph_name_str,
                                    label_name_str, LABEL_KIND_VERTEX);
 
+    /* Get the label relation and check permissions */
+    label_relid = get_label_relation(label_name_str, graph_oid);
+    check_table_permissions(label_relid);
+    check_rls_for_load(label_relid);
+
     create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid,
                                 label_name_str, label_id, id_field_exists,
                                 load_as_agtype);
@@ -459,7 +635,6 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
 PG_FUNCTION_INFO_V1(load_edges_from_file);
 Datum load_edges_from_file(PG_FUNCTION_ARGS)
 {
-
     Name graph_name;
     Name label_name;
     text* file_name;
@@ -467,6 +642,7 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
     char* label_name_str;
     char* file_path_str;
     Oid graph_oid;
+    Oid label_relid;
     int32 label_id;
     bool load_as_agtype;
 
@@ -488,6 +664,9 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
                 errmsg("file path must not be NULL")));
     }
 
+    /* Check file read permission first */
+    check_file_read_permission();
+
     graph_name = PG_GETARG_NAME(0);
     label_name = PG_GETARG_NAME(1);
     file_name = PG_GETARG_TEXT_P(2);
@@ -507,6 +686,11 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
     label_id = get_or_create_label(graph_oid, graph_name_str,
                                    label_name_str, LABEL_KIND_EDGE);
 
+    /* Get the label relation and check permissions */
+    label_relid = get_label_relation(label_name_str, graph_oid);
+    check_table_permissions(label_relid);
+    check_rls_for_load(label_relid);
+
     create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid,
                                label_name_str, label_id, load_as_agtype);
 
@@ -597,19 +781,42 @@ void init_batch_insert(batch_insert_state **batch_state,
     Oid relid;
     EState *estate;
     ResultRelInfo *resultRelInfo;
+    RangeTblEntry *rte;
+    RTEPermissionInfo *perminfo;
+    List *range_table = NIL;
+    List *perminfos = NIL;
     int i;
 
-    /* Open the relation */
+    /* Get the relation OID */
     relid = get_label_relation(label_name, graph_oid);
-    relation = table_open(relid, RowExclusiveLock);
 
     /* Initialize executor state */
     estate = CreateExecutorState();
 
-    /* Initialize resultRelInfo */
+    /* Create range table entry for ExecConstraints */
+    rte = makeNode(RangeTblEntry);
+    rte->rtekind = RTE_RELATION;
+    rte->relid = relid;
+    rte->relkind = RELKIND_RELATION;
+    rte->rellockmode = RowExclusiveLock;
+    rte->perminfoindex = 1;
+    range_table = list_make1(rte);
+
+    /* Create permission info */
+    perminfo = makeNode(RTEPermissionInfo);
+    perminfo->relid = relid;
+    perminfo->requiredPerms = ACL_INSERT;
+    perminfos = list_make1(perminfo);
+
+    /* Initialize range table in executor state */
+    ExecInitRangeTable(estate, range_table, perminfos);
+
+    /* Initialize resultRelInfo - this opens the relation */
     resultRelInfo = makeNode(ResultRelInfo);
-    InitResultRelInfo(resultRelInfo, relation, 1, NULL, estate->es_instrument);
-    estate->es_result_relations = &resultRelInfo;
+    ExecInitResultRelation(estate, resultRelInfo, 1);
+
+    /* Get relation from resultRelInfo (opened by ExecInitResultRelation) */
+    relation = resultRelInfo->ri_RelationDesc;
 
     /* Open the indices */
     ExecOpenIndices(resultRelInfo, false);
@@ -619,8 +826,9 @@ void init_batch_insert(batch_insert_state **batch_state,
     (*batch_state)->slots = palloc(sizeof(TupleTableSlot *) * BATCH_SIZE);
     (*batch_state)->estate = estate;
     (*batch_state)->resultRelInfo = resultRelInfo;
-    (*batch_state)->max_tuples = BATCH_SIZE;
     (*batch_state)->num_tuples = 0;
+    (*batch_state)->buffered_bytes = 0;
+    (*batch_state)->bistate = GetBulkInsertState();
 
     /* Create slots */
     for (i = 0; i < BATCH_SIZE; i++)
@@ -651,12 +859,14 @@ void finish_batch_insert(batch_insert_state **batch_state)
         ExecDropSingleTupleTableSlot((*batch_state)->slots[i]);
     }
 
-    /* Clean up, close the indices and relation */
-    ExecCloseIndices((*batch_state)->resultRelInfo);
-    table_close((*batch_state)->resultRelInfo->ri_RelationDesc,
-                RowExclusiveLock);
+    /* Free BulkInsertState */
+    FreeBulkInsertState((*batch_state)->bistate);
+
+    /* Close result relations and range table relations */
+    ExecCloseResultRelations((*batch_state)->estate);
+    ExecCloseRangeTableRelations((*batch_state)->estate);
 
-    /* Clean up batch state */
+    /* Clean up executor state */
     FreeExecutorState((*batch_state)->estate);
     pfree((*batch_state)->slots);
     pfree(*batch_state);
diff --git a/src/backend/utils/load/libcsv.c b/src/backend/utils/load/libcsv.c
deleted file mode 100644
index f0e8b46b..00000000
--- a/src/backend/utils/load/libcsv.c
+++ /dev/null
@@ -1,549 +0,0 @@
-/*
-libcsv - parse and write csv data
-Copyright (C) 2008  Robert Gamble
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
-*/
-
-#include <assert.h>
-
-#if __STDC_VERSION__ >= 199901L
-#  include <stdint.h>
-#else
- /* C89 doesn't have stdint.h or SIZE_MAX */
-#  define SIZE_MAX ((size_t)-1)
-#endif
-
-#include "utils/load/csv.h"
-
-#define VERSION "3.0.3"
-
-#define ROW_NOT_BEGUN           0
-#define FIELD_NOT_BEGUN         1
-#define FIELD_BEGUN             2
-#define FIELD_MIGHT_HAVE_ENDED  3
-
-/*
-  Explanation of states
-  ROW_NOT_BEGUN    There have not been any fields encountered for this row
-  FIELD_NOT_BEGUN  There have been fields but we are currently not in one
-  FIELD_BEGUN      We are in a field
-  FIELD_MIGHT_HAVE_ENDED
-                   We encountered a double quote inside a quoted field, the
-                   field is either ended or the quote is literal
-*/
-
-#define MEM_BLK_SIZE 128
-
-#define SUBMIT_FIELD(p) \
-  do { \
-   if (!quoted) \
-     entry_pos -= spaces; \
-   if (p->options & CSV_APPEND_NULL) \
-     ((p)->entry_buf[entry_pos]) = '\0'; \
-   if (cb1 && (p->options & CSV_EMPTY_IS_NULL) && !quoted && entry_pos == 0) \
-     cb1(NULL, entry_pos, data); \
-   else if (cb1) \
-     cb1(p->entry_buf, entry_pos, data); \
-   pstate = FIELD_NOT_BEGUN; \
-   entry_pos = quoted = spaces = 0; \
- } while (0)
-
-#define SUBMIT_ROW(p, c) \
-  do { \
-    if (cb2) \
-      cb2(c, data); \
-    pstate = ROW_NOT_BEGUN; \
-    entry_pos = quoted = spaces = 0; \
-  } while (0)
-
-#define SUBMIT_CHAR(p, c) ((p)->entry_buf[entry_pos++] = (c))
-
-static const char *csv_errors[] = {"success",
-                             "error parsing data while strict checking 
enabled",
-                             "memory exhausted while increasing buffer size",
-                             "data size too large",
-                             "invalid status code"};
-
-int
-csv_error(const struct csv_parser *p)
-{
-  assert(p && "received null csv_parser");
-
-  /* Return the current status of the parser */
-  return p->status;
-}
-
-const char *
-csv_strerror(int status)
-{
-  /* Return a textual description of status */
-  if (status >= CSV_EINVALID || status < 0)
-    return csv_errors[CSV_EINVALID];
-  else
-    return csv_errors[status];
-}
-
-int
-csv_get_opts(const struct csv_parser *p)
-{
-  /* Return the currently set options of parser */
-  if (p == NULL)
-    return -1;
-
-  return p->options;
-}
-
-int
-csv_set_opts(struct csv_parser *p, unsigned char options)
-{
-  /* Set the options */
-  if (p == NULL)
-    return -1;
-
-  p->options = options;
-  return 0;
-}
-
-int
-csv_init(struct csv_parser *p, unsigned char options)
-{
-  /* Initialize a csv_parser object returns 0 on success, -1 on error */
-  if (p == NULL)
-    return -1;
-
-  p->entry_buf = NULL;
-  p->pstate = ROW_NOT_BEGUN;
-  p->quoted = 0;
-  p->spaces = 0;
-  p->entry_pos = 0;
-  p->entry_size = 0;
-  p->status = 0;
-  p->options = options;
-  p->quote_char = CSV_QUOTE;
-  p->delim_char = CSV_COMMA;
-  p->is_space = NULL;
-  p->is_term = NULL;
-  p->blk_size = MEM_BLK_SIZE;
-  p->malloc_func = NULL;
-  p->realloc_func = realloc;
-  p->free_func = free;
-
-  return 0;
-}
-
-void
-csv_free(struct csv_parser *p)
-{
-  /* Free the entry_buffer of csv_parser object */
-  if (p == NULL)
-    return;
-
-  if (p->entry_buf && p->free_func)
-    p->free_func(p->entry_buf);
-
-  p->entry_buf = NULL;
-  p->entry_size = 0;
-
-  return;
-}
-
-int
-csv_fini(struct csv_parser *p, void (*cb1)(void *, size_t, void *), void 
(*cb2)(int c, void *), void *data)
-{
-  int quoted;
-  int pstate;
-  size_t spaces;
-  size_t entry_pos;
-
-  if (p == NULL)
-    return -1;
-
-  /* Finalize parsing.  Needed, for example, when file does not end in a 
newline */
-  quoted = p->quoted;
-  pstate = p->pstate;
-  spaces = p->spaces;
-  entry_pos = p->entry_pos;
-
-  if ((pstate == FIELD_BEGUN) && p->quoted && (p->options & CSV_STRICT) && 
(p->options & CSV_STRICT_FINI)) {
-    /* Current field is quoted, no end-quote was seen, and CSV_STRICT_FINI is 
set */
-    p->status = CSV_EPARSE;
-    return -1;
-  }
-
-  switch (pstate) {
-    case FIELD_MIGHT_HAVE_ENDED:
-      p->entry_pos -= p->spaces + 1;  /* get rid of spaces and original quote 
*/
-      entry_pos = p->entry_pos;
-      /*lint -fallthrough */
-    case FIELD_NOT_BEGUN:
-    case FIELD_BEGUN:
-      /* Unnecessary:
-      quoted = p->quoted, pstate = p->pstate;
-      spaces = p->spaces, entry_pos = p->entry_pos;
-      */
-      SUBMIT_FIELD(p);
-      SUBMIT_ROW(p, -1);
-      break;
-    case ROW_NOT_BEGUN: /* Already ended properly */
-      ;
-  }
-
-  /* Reset parser */
-  p->spaces = p->quoted = p->entry_pos = p->status = 0;
-  p->pstate = ROW_NOT_BEGUN;
-
-  return 0;
-}
-
-void
-csv_set_delim(struct csv_parser *p, unsigned char c)
-{
-  /* Set the delimiter */
-  if (p) p->delim_char = c;
-}
-
-void
-csv_set_quote(struct csv_parser *p, unsigned char c)
-{
-  /* Set the quote character */
-  if (p) p->quote_char = c;
-}
-
-unsigned char
-csv_get_delim(const struct csv_parser *p)
-{
-  assert(p && "received null csv_parser");
-
-  /* Get the delimiter */
-  return p->delim_char;
-}
-
-unsigned char
-csv_get_quote(const struct csv_parser *p)
-{
-  assert(p && "received null csv_parser");
-
-  /* Get the quote character */
-  return p->quote_char;
-}
-
-void
-csv_set_space_func(struct csv_parser *p, int (*f)(unsigned char))
-{
-  /* Set the space function */
-  if (p) p->is_space = f;
-}
-
-void
-csv_set_term_func(struct csv_parser *p, int (*f)(unsigned char))
-{
-  /* Set the term function */
-  if (p) p->is_term = f;
-}
-
-void
-csv_set_realloc_func(struct csv_parser *p, void *(*f)(void *, size_t))
-{
-  /* Set the realloc function used to increase buffer size */
-  if (p && f) p->realloc_func = f;
-}
-
-void
-csv_set_free_func(struct csv_parser *p, void (*f)(void *))
-{
-  /* Set the free function used to free the buffer */
-  if (p && f) p->free_func = f;
-}
-
-void
-csv_set_blk_size(struct csv_parser *p, size_t size)
-{
-  /* Set the block size used to increment buffer size */
-  if (p) p->blk_size = size;
-}
-
-size_t
-csv_get_buffer_size(const struct csv_parser *p)
-{
-  /* Get the size of the entry buffer */
-  if (p)
-    return p->entry_size;
-  return 0;
-}
-
-static int
-csv_increase_buffer(struct csv_parser *p)
-{
-  size_t to_add;
-  void *vp;
-
-  if (p == NULL) return 0;
-  if (p->realloc_func == NULL) return 0;
-
-  /* Increase the size of the entry buffer.  Attempt to increase size by
-   * p->blk_size, if this is larger than SIZE_MAX try to increase current
-   * buffer size to SIZE_MAX.  If allocation fails, try to allocate halve
-   * the size and try again until successful or increment size is zero.
-   */
-
-  to_add = p->blk_size;
-
-  if ( p->entry_size >= SIZE_MAX - to_add )
-    to_add = SIZE_MAX - p->entry_size;
-
-  if (!to_add) {
-    p->status = CSV_ETOOBIG;
-    return -1;
-  }
-
-  while ((vp = p->realloc_func(p->entry_buf, p->entry_size + to_add)) == NULL) 
{
-    to_add /= 2;
-    if (!to_add) {
-      p->status = CSV_ENOMEM;
-      return -1;
-    }
-  }
-
-  /* Update entry buffer pointer and entry_size if successful */
-  p->entry_buf = vp;
-  p->entry_size += to_add;
-  return 0;
-}
-
-size_t
-csv_parse(struct csv_parser *p, const void *s, size_t len, void (*cb1)(void *, 
size_t, void *), void (*cb2)(int c, void *), void *data)
-{
-  unsigned const char *us = s;  /* Access input data as array of unsigned char 
*/
-  unsigned char c;              /* The character we are currently processing */
-  size_t pos = 0;               /* The number of characters we have processed 
in this call */
-
-  /* Store key fields into local variables for performance */
-  unsigned char delim = p->delim_char;
-  unsigned char quote = p->quote_char;
-  int (*is_space)(unsigned char) = p->is_space;
-  int (*is_term)(unsigned char) = p->is_term;
-  int quoted = p->quoted;
-  int pstate = p->pstate;
-  size_t spaces = p->spaces;
-  size_t entry_pos = p->entry_pos;
-
-
-  if (!p->entry_buf && pos < len) {
-    /* Buffer hasn't been allocated yet and len > 0 */
-    if (csv_increase_buffer(p) != 0) {
-      p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos 
= entry_pos;
-      return pos;
-    }
-  }
-
-  while (pos < len) {
-    /* Check memory usage, increase buffer if necessary */
-    if (entry_pos == ((p->options & CSV_APPEND_NULL) ? p->entry_size - 1 : 
p->entry_size) ) {
-      if (csv_increase_buffer(p) != 0) {
-        p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, 
p->entry_pos = entry_pos;
-        return pos;
-      }
-    }
-
-    c = us[pos++];
-
-    switch (pstate) {
-      case ROW_NOT_BEGUN:
-      case FIELD_NOT_BEGUN:
-        if ((is_space ? is_space(c) : c == CSV_SPACE || c == CSV_TAB) && 
c!=delim) { /* Space or Tab */
-          continue;
-        } else if (is_term ? is_term(c) : c == CSV_CR || c == CSV_LF) { /* 
Carriage Return or Line Feed */
-          if (pstate == FIELD_NOT_BEGUN) {
-            SUBMIT_FIELD(p);
-            SUBMIT_ROW(p, c);
-          } else {  /* ROW_NOT_BEGUN */
-            /* Don't submit empty rows by default */
-            if (p->options & CSV_REPALL_NL) {
-              SUBMIT_ROW(p, c);
-            }
-          }
-          continue;
-        } else if (c == delim) { /* Comma */
-          SUBMIT_FIELD(p);
-          break;
-        } else if (c == quote) { /* Quote */
-          pstate = FIELD_BEGUN;
-          quoted = 1;
-        } else {               /* Anything else */
-          pstate = FIELD_BEGUN;
-          quoted = 0;
-          SUBMIT_CHAR(p, c);
-        }
-        break;
-      case FIELD_BEGUN:
-        if (c == quote) {         /* Quote */
-          if (quoted) {
-            SUBMIT_CHAR(p, c);
-            pstate = FIELD_MIGHT_HAVE_ENDED;
-          } else {
-            /* STRICT ERROR - double quote inside non-quoted field */
-            if (p->options & CSV_STRICT) {
-              p->status = CSV_EPARSE;
-              p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, 
p->entry_pos = entry_pos;
-              return pos-1;
-            }
-            SUBMIT_CHAR(p, c);
-            spaces = 0;
-          }
-        } else if (c == delim) {  /* Comma */
-          if (quoted) {
-            SUBMIT_CHAR(p, c);
-          } else {
-            SUBMIT_FIELD(p);
-          }
-        } else if (is_term ? is_term(c) : c == CSV_CR || c == CSV_LF) {  /* 
Carriage Return or Line Feed */
-          if (!quoted) {
-            SUBMIT_FIELD(p);
-            SUBMIT_ROW(p, c);
-          } else {
-            SUBMIT_CHAR(p, c);
-          }
-        } else if (!quoted && (is_space? is_space(c) : c == CSV_SPACE || c == 
CSV_TAB)) { /* Tab or space for non-quoted field */
-            SUBMIT_CHAR(p, c);
-            spaces++;
-        } else {  /* Anything else */
-          SUBMIT_CHAR(p, c);
-          spaces = 0;
-        }
-        break;
-      case FIELD_MIGHT_HAVE_ENDED:
-        /* This only happens when a quote character is encountered in a quoted 
field */
-        if (c == delim) {  /* Comma */
-          entry_pos -= spaces + 1;  /* get rid of spaces and original quote */
-          SUBMIT_FIELD(p);
-        } else if (is_term ? is_term(c) : c == CSV_CR || c == CSV_LF) {  /* 
Carriage Return or Line Feed */
-          entry_pos -= spaces + 1;  /* get rid of spaces and original quote */
-          SUBMIT_FIELD(p);
-          SUBMIT_ROW(p, c);
-        } else if (is_space ? is_space(c) : c == CSV_SPACE || c == CSV_TAB) {  
/* Space or Tab */
-          SUBMIT_CHAR(p, c);
-          spaces++;
-        } else if (c == quote) {  /* Quote */
-          if (spaces) {
-            /* STRICT ERROR - unescaped double quote */
-            if (p->options & CSV_STRICT) {
-              p->status = CSV_EPARSE;
-              p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, 
p->entry_pos = entry_pos;
-              return pos-1;
-            }
-            spaces = 0;
-            SUBMIT_CHAR(p, c);
-          } else {
-            /* Two quotes in a row */
-            pstate = FIELD_BEGUN;
-          }
-        } else {  /* Anything else */
-          /* STRICT ERROR - unescaped double quote */
-          if (p->options & CSV_STRICT) {
-            p->status = CSV_EPARSE;
-            p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, 
p->entry_pos = entry_pos;
-            return pos-1;
-          }
-          pstate = FIELD_BEGUN;
-          spaces = 0;
-          SUBMIT_CHAR(p, c);
-        }
-        break;
-     default:
-       break;
-    }
-  }
-  p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos = 
entry_pos;
-  return pos;
-}
-
-size_t
-csv_write (void *dest, size_t dest_size, const void *src, size_t src_size)
-{
-  return csv_write2(dest, dest_size, src, src_size, CSV_QUOTE);
-}
-
-int
-csv_fwrite (FILE *fp, const void *src, size_t src_size)
-{
-  return csv_fwrite2(fp, src, src_size, CSV_QUOTE);
-}
-
-size_t
-csv_write2 (void *dest, size_t dest_size, const void *src, size_t src_size, 
unsigned char quote)
-{
-  unsigned char *cdest = dest;
-  const unsigned char *csrc = src;
-  size_t chars = 0;
-
-  if (src == NULL)
-    return 0;
-
-  if (dest == NULL)
-    dest_size = 0;
-
-  if (dest_size > 0)
-    *cdest++ = quote;
-  chars++;
-
-  while (src_size) {
-    if (*csrc == quote) {
-      if (dest_size > chars)
-        *cdest++ = quote;
-      if (chars < SIZE_MAX) chars++;
-    }
-    if (dest_size > chars)
-      *cdest++ = *csrc;
-    if (chars < SIZE_MAX) chars++;
-    src_size--;
-    csrc++;
-  }
-
-  if (dest_size > chars)
-    *cdest = quote;
-  if (chars < SIZE_MAX) chars++;
-
-  return chars;
-}
-
-int
-csv_fwrite2 (FILE *fp, const void *src, size_t src_size, unsigned char quote)
-{
-  const unsigned char *csrc = src;
-
-  if (fp == NULL || src == NULL)
-    return 0;
-
-  if (fputc(quote, fp) == EOF)
-    return EOF;
-
-  while (src_size) {
-    if (*csrc == quote) {
-      if (fputc(quote, fp) == EOF)
-        return EOF;
-    }
-    if (fputc(*csrc, fp) == EOF)
-      return EOF;
-    src_size--;
-    csrc++;
-  }
-
-  if (fputc(quote, fp) == EOF) {
-    return EOF;
-  }
-
-  return 0;
-}
diff --git a/src/include/utils/load/ag_load_edges.h 
b/src/include/utils/load/ag_load_edges.h
index df663b1d..4db00d93 100644
--- a/src/include/utils/load/ag_load_edges.h
+++ b/src/include/utils/load/ag_load_edges.h
@@ -17,42 +17,28 @@
  * under the License.
  */
 
-#include "access/heapam.h"
-#include "utils/load/age_load.h"
-
 #ifndef AG_LOAD_EDGES_H
 #define AG_LOAD_EDGES_H
 
-typedef struct {
-    size_t row;
-    char **header;
-    size_t *header_len;
-    size_t header_num;
-    char **fields;
-    size_t *fields_len;
-    size_t alloc;
-    size_t cur_field;
-    int error;
-    size_t header_row_length;
-    size_t curr_row_length;
-    char *graph_name;
-    Oid graph_oid;
-    char *label_name;
-    int label_id;
-    Oid label_seq_relid;
-    char *start_vertex;
-    char *end_vertex;
-    bool load_as_agtype;
-    batch_insert_state *batch_state;
-} csv_edge_reader;
-
-
-void edge_field_cb(void *field, size_t field_len, void *data);
-void edge_row_cb(int delim __attribute__((unused)), void *data);
+#include "utils/load/age_load.h"
 
+/*
+ * Load edges from a CSV file using pg's COPY infrastructure.
+ *
+ * CSV format: start_id, start_vertex_type, end_id, end_vertex_type, 
[properties...]
+ *
+ * Parameters:
+ *   file_path       - Path to the CSV file (must be in /tmp/age/)
+ *   graph_name      - Name of the graph
+ *   graph_oid       - OID of the graph
+ *   label_name      - Name of the edge label
+ *   label_id        - ID of the label
+ *   load_as_agtype  - If true, parse CSV values as agtype (JSON-like)
+ *
+ * Returns EXIT_SUCCESS on success.
+ */
 int create_edges_from_csv_file(char *file_path, char *graph_name, Oid 
graph_oid,
-                                char *label_name, int label_id,
-                                bool load_as_agtype);
-
-#endif /*AG_LOAD_EDGES_H */
+                               char *label_name, int label_id,
+                               bool load_as_agtype);
 
+#endif /* AG_LOAD_EDGES_H */
diff --git a/src/include/utils/load/ag_load_labels.h 
b/src/include/utils/load/ag_load_labels.h
index b8ed1572..c3d517f3 100644
--- a/src/include/utils/load/ag_load_labels.h
+++ b/src/include/utils/load/ag_load_labels.h
@@ -17,46 +17,26 @@
  * under the License.
  */
 
-
 #ifndef AG_LOAD_LABELS_H
 #define AG_LOAD_LABELS_H
 
-#include "access/heapam.h"
 #include "utils/load/age_load.h"
 
-struct counts {
-    long unsigned fields;
-    long unsigned allvalues;
-    long unsigned rows;
-};
-
-typedef struct {
-    size_t row;
-    char **header;
-    size_t *header_len;
-    size_t header_num;
-    char **fields;
-    size_t *fields_len;
-    size_t alloc;
-    size_t cur_field;
-    int error;
-    size_t header_row_length;
-    size_t curr_row_length;
-    char *graph_name;
-    Oid graph_oid;
-    char *label_name;
-    int label_id;
-    Oid label_seq_relid;
-    bool id_field_exists;
-    bool load_as_agtype;
-    int curr_seq_num;
-    batch_insert_state *batch_state;
-} csv_vertex_reader;
-
-
-void vertex_field_cb(void *field, size_t field_len, void *data);
-void vertex_row_cb(int delim __attribute__((unused)), void *data);
-
+/*
+ * Load vertex labels from a CSV file using pg's COPY infrastructure.
+ * CSV format: [id,] [properties...]
+ *
+ * Parameters:
+ *   file_path       - Path to the CSV file (must be in /tmp/age/)
+ *   graph_name      - Name of the graph
+ *   graph_oid       - OID of the graph
+ *   label_name      - Name of the vertex label
+ *   label_id        - ID of the label
+ *   id_field_exists - If true, first CSV column contains the vertex ID
+ *   load_as_agtype  - If true, parse CSV values as agtype (JSON-like)
+ *
+ * Returns EXIT_SUCCESS on success.
+ */
 int create_labels_from_csv_file(char *file_path, char *graph_name, Oid 
graph_oid,
                                 char *label_name, int label_id,
                                 bool id_field_exists, bool load_as_agtype);
diff --git a/src/include/utils/load/age_load.h 
b/src/include/utils/load/age_load.h
index 72f11493..6573c79f 100644
--- a/src/include/utils/load/age_load.h
+++ b/src/include/utils/load/age_load.h
@@ -17,6 +17,10 @@
  * under the License.
  */
 
+#ifndef AG_LOAD_H
+#define AG_LOAD_H
+
+#include "access/heapam.h"
 #include "commands/sequence.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -27,10 +31,8 @@
 #include "commands/graph_commands.h"
 #include "utils/ag_cache.h"
 
-#ifndef AGE_ENTITY_CREATOR_H
-#define AGE_ENTITY_CREATOR_H
-
 #define BATCH_SIZE 1000
+#define MAX_BUFFERED_BYTES 65535  /* 64KB, same as pg COPY */
 
 typedef struct batch_insert_state
 {
@@ -38,26 +40,29 @@ typedef struct batch_insert_state
     ResultRelInfo *resultRelInfo;
     TupleTableSlot **slots;
     int num_tuples;
-    int max_tuples;
+    size_t buffered_bytes;
+    BulkInsertState bistate;
 } batch_insert_state;
 
-agtype* create_empty_agtype(void);
-
-agtype* create_agtype_from_list(char **header, char **fields,
+agtype *create_empty_agtype(void);
+agtype *create_agtype_from_list(char **header, char **fields,
                                 size_t fields_len, int64 vertex_id,
                                 bool load_as_agtype);
-agtype* create_agtype_from_list_i(char **header, char **fields,
+agtype *create_agtype_from_list_i(char **header, char **fields,
                                   size_t fields_len, size_t start_index,
                                   bool load_as_agtype);
+
 void insert_vertex_simple(Oid graph_oid, char *label_name, graphid vertex_id,
                           agtype *vertex_properties);
 void insert_edge_simple(Oid graph_oid, char *label_name, graphid edge_id,
                         graphid start_id, graphid end_id,
-                        agtype* end_properties);
-void insert_batch(batch_insert_state *batch_state);
+                        agtype *edge_properties);
 
 void init_batch_insert(batch_insert_state **batch_state,
                        char *label_name, Oid graph_oid);
+void insert_batch(batch_insert_state *batch_state);
 void finish_batch_insert(batch_insert_state **batch_state);
 
-#endif /* AGE_ENTITY_CREATOR_H */
+char *trim_whitespace(const char *str);
+
+#endif /* AG_LOAD_H */
diff --git a/src/include/utils/load/csv.h b/src/include/utils/load/csv.h
deleted file mode 100644
index 06253697..00000000
--- a/src/include/utils/load/csv.h
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Created by Shoaib on 12/5/2021.
-*/
-
-/*
-libcsv - parse and write csv data
-Copyright (C) 2008-2021  Robert Gamble
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-Lesser General Public License for more details.
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
-*/
-
-#ifndef LIBCSV_H__
-#define LIBCSV_H__
-#include <stdlib.h>
-#include <stdio.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#define CSV_MAJOR 3
-#define CSV_MINOR 0
-#define CSV_RELEASE 3
-
-/* Error Codes */
-#define CSV_SUCCESS 0
-#define CSV_EPARSE 1   /* Parse error in strict mode */
-#define CSV_ENOMEM 2   /* Out of memory while increasing buffer size */
-#define CSV_ETOOBIG 3  /* Buffer larger than SIZE_MAX needed */
-#define CSV_EINVALID 4 /* Invalid code,should never be received from 
csv_error*/
-
-
-/* parser options */
-#define CSV_STRICT 1    /* enable strict mode */
-#define CSV_REPALL_NL 2 /* report all unquoted carriage returns and linefeeds 
*/
-#define CSV_STRICT_FINI 4 /* causes csv_fini to return CSV_EPARSE if last
-                             field is quoted and doesn't contain ending
-                             quote */
-#define CSV_APPEND_NULL 8 /* Ensure that all fields are null-terminated */
-#define CSV_EMPTY_IS_NULL 16 /* Pass null pointer to cb1 function when
-                                empty, unquoted fields are encountered */
-
-
-/* Character values */
-#define CSV_TAB    0x09
-#define CSV_SPACE  0x20
-#define CSV_CR     0x0d
-#define CSV_LF     0x0a
-#define CSV_COMMA  0x2c
-#define CSV_QUOTE  0x22
-
-struct csv_parser {
-    int pstate;         /* Parser state */
-    int quoted;         /* Is the current field a quoted field? */
-    size_t spaces;      /* Number of continuous spaces after quote or in a 
non-quoted field */
-    unsigned char * entry_buf;   /* Entry buffer */
-    size_t entry_pos;   /* Current position in entry_buf (and current size of 
entry) */
-    size_t entry_size;  /* Size of entry buffer */
-    int status;         /* Operation status */
-    unsigned char options;
-    unsigned char quote_char;
-    unsigned char delim_char;
-    int (*is_space)(unsigned char);
-    int (*is_term)(unsigned char);
-    size_t blk_size;
-    void *(*malloc_func)(size_t);           /* not used */
-    void *(*realloc_func)(void *, size_t);  /* function used to allocate 
buffer memory */
-    void (*free_func)(void *);              /* function used to free buffer 
memory */
-};
-
-/* Function Prototypes */
-int csv_init(struct csv_parser *p, unsigned char options);
-int csv_fini(struct csv_parser *p, void (*cb1)(void *, size_t, void *), void 
(*cb2)(int, void *), void *data);
-void csv_free(struct csv_parser *p);
-int csv_error(const struct csv_parser *p);
-const char * csv_strerror(int error);
-size_t csv_parse(struct csv_parser *p, const void *s, size_t len, void 
(*cb1)(void *, size_t, void *), void (*cb2)(int, void *), void *data);
-size_t csv_write(void *dest, size_t dest_size, const void *src, size_t 
src_size);
-int csv_fwrite(FILE *fp, const void *src, size_t src_size);
-size_t csv_write2(void *dest, size_t dest_size, const void *src, size_t 
src_size, unsigned char quote);
-int csv_fwrite2(FILE *fp, const void *src, size_t src_size, unsigned char 
quote);
-int csv_get_opts(const struct csv_parser *p);
-int csv_set_opts(struct csv_parser *p, unsigned char options);
-void csv_set_delim(struct csv_parser *p, unsigned char c);
-void csv_set_quote(struct csv_parser *p, unsigned char c);
-unsigned char csv_get_delim(const struct csv_parser *p);
-unsigned char csv_get_quote(const struct csv_parser *p);
-void csv_set_space_func(struct csv_parser *p, int (*f)(unsigned char));
-void csv_set_term_func(struct csv_parser *p, int (*f)(unsigned char));
-void csv_set_realloc_func(struct csv_parser *p, void *(*)(void *, size_t));
-void csv_set_free_func(struct csv_parser *p, void (*)(void *));
-void csv_set_blk_size(struct csv_parser *p, size_t);
-size_t csv_get_buffer_size(const struct csv_parser *p);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif


Reply via email to