This is an automated email from the ASF dual-hosted git repository.
jgemignani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/age.git
The following commit(s) were added to refs/heads/master by this push:
new b29ca5e7 Replace libcsv with pg COPY for csv loading (#2310)
b29ca5e7 is described below
commit b29ca5e7d2f84cfe2619eea70c4ace2cba41aa0b
Author: Muhammad Taha Naveed <[email protected]>
AuthorDate: Mon Jan 19 22:21:02 2026 +0500
Replace libcsv with pg COPY for csv loading (#2310)
- Commit also adds permission checks
- Resolves a critical memory spike issue on loading large file
- Use pg's COPY infrastructure (BeginCopyFrom, NextCopyFromRawFields)
for 64KB buffered CSV parsing instead of libcsv
- Add byte based flush threshold (64KB) matching COPY behavior for memory
safety
- Use heap_multi_insert with BulkInsertState for optimized batch inserts
- Add per batch memory context to prevent memory growth during large loads
- Remove libcsv dependency (libcsv.c, csv.h)
- Improves loading performance by 15-20%
- No previous regression tests were impacted
- Added regression tests for permissions/rls
Assisted-by AI
---
Makefile | 1 -
regress/expected/age_load.out | 189 +++++++++++
regress/expected/index.out | 12 +-
regress/sql/age_load.sql | 125 ++++++++
regress/sql/index.sql | 2 +-
src/backend/utils/load/ag_load_edges.c | 388 +++++++++++-----------
src/backend/utils/load/ag_load_labels.c | 381 +++++++++++-----------
src/backend/utils/load/age_load.c | 248 +++++++++++++--
src/backend/utils/load/libcsv.c | 549 --------------------------------
src/include/utils/load/ag_load_edges.h | 52 ++-
src/include/utils/load/ag_load_labels.h | 50 +--
src/include/utils/load/age_load.h | 27 +-
src/include/utils/load/csv.h | 108 -------
13 files changed, 1000 insertions(+), 1132 deletions(-)
diff --git a/Makefile b/Makefile
index ffad7d6a..a8faa2bb 100644
--- a/Makefile
+++ b/Makefile
@@ -69,7 +69,6 @@ OBJS = src/backend/age.o \
src/backend/utils/load/ag_load_labels.o \
src/backend/utils/load/ag_load_edges.o \
src/backend/utils/load/age_load.o \
- src/backend/utils/load/libcsv.o \
src/backend/utils/name_validation.o \
src/backend/utils/ag_guc.o
diff --git a/regress/expected/age_load.out b/regress/expected/age_load.out
index 55d1ff1d..1f76c31c 100644
--- a/regress/expected/age_load.out
+++ b/regress/expected/age_load.out
@@ -454,6 +454,195 @@ NOTICE: graph "agload_conversion" has been dropped
(1 row)
+--
+-- Test security and permissions
+--
+SELECT create_graph('agload_security');
+NOTICE: graph "agload_security" has been created
+ create_graph
+--------------
+
+(1 row)
+
+SELECT create_vlabel('agload_security', 'Person1');
+NOTICE: VLabel "Person1" has been created
+ create_vlabel
+---------------
+
+(1 row)
+
+SELECT create_vlabel('agload_security', 'Person2');
+NOTICE: VLabel "Person2" has been created
+ create_vlabel
+---------------
+
+(1 row)
+
+SELECT create_elabel('agload_security', 'SecEdge');
+NOTICE: ELabel "SecEdge" has been created
+ create_elabel
+---------------
+
+(1 row)
+
+--
+-- Test 1: File read permission (pg_read_server_files role)
+--
+-- Create a user without pg_read_server_files role
+CREATE USER load_test_user;
+GRANT USAGE ON SCHEMA ag_catalog TO load_test_user;
+-- This should fail because load_test_user doesn't have pg_read_server_files
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+ERROR: permission denied to LOAD from a file
+DETAIL: Only roles with privileges of the "pg_read_server_files" role may
LOAD from a file.
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+ERROR: permission denied to LOAD from a file
+DETAIL: Only roles with privileges of the "pg_read_server_files" role may
LOAD from a file.
+RESET ROLE;
+-- Grant pg_read_server_files and try again - should fail on table permission
now
+GRANT pg_read_server_files TO load_test_user;
+--
+-- Test 2: Table INSERT permission (ACL_INSERT)
+--
+-- User has file read permission but no INSERT on the label table
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+ERROR: permission denied for table Person1
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+ERROR: permission denied for table SecEdge
+RESET ROLE;
+-- Grant INSERT permission and try again - should succeed
+GRANT USAGE ON SCHEMA agload_security TO load_test_user;
+GRANT INSERT ON agload_security."Person1" TO load_test_user;
+GRANT INSERT ON agload_security."SecEdge" TO load_test_user;
+GRANT UPDATE ON SEQUENCE agload_security."Person1_id_seq" TO load_test_user;
+GRANT UPDATE ON SEQUENCE agload_security."SecEdge_id_seq" TO load_test_user;
+GRANT SELECT ON ag_catalog.ag_label TO load_test_user;
+GRANT SELECT ON ag_catalog.ag_graph TO load_test_user;
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+ load_labels_from_file
+-----------------------
+
+(1 row)
+
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+ load_edges_from_file
+----------------------
+
+(1 row)
+
+RESET ROLE;
+-- Verify data was loaded
+SELECT COUNT(*) FROM agload_security."Person1";
+ count
+-------
+ 6
+(1 row)
+
+SELECT COUNT(*) FROM agload_security."SecEdge";
+ count
+-------
+ 6
+(1 row)
+
+-- cleanup
+DELETE FROM agload_security."Person1";
+DELETE FROM agload_security."SecEdge";
+--
+-- Test 3: Row-Level Security (RLS)
+--
+-- Enable RLS on the label tables
+ALTER TABLE agload_security."Person1" ENABLE ROW LEVEL SECURITY;
+ALTER TABLE agload_security."SecEdge" ENABLE ROW LEVEL SECURITY;
+-- Switch to load_test_user
+SET ROLE load_test_user;
+-- Loading should fail when RLS is enabled
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+ERROR: LOAD from file is not supported with row-level security
+HINT: Use Cypher CREATE clause instead.
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+ERROR: LOAD from file is not supported with row-level security
+HINT: Use Cypher CREATE clause instead.
+RESET ROLE;
+-- Disable RLS and try again - should succeed
+ALTER TABLE agload_security."Person1" DISABLE ROW LEVEL SECURITY;
+ALTER TABLE agload_security."SecEdge" DISABLE ROW LEVEL SECURITY;
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+ load_labels_from_file
+-----------------------
+
+(1 row)
+
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+ load_edges_from_file
+----------------------
+
+(1 row)
+
+-- Verify data was loaded
+SELECT COUNT(*) FROM agload_security."Person1";
+ count
+-------
+ 6
+(1 row)
+
+SELECT COUNT(*) FROM agload_security."SecEdge";
+ count
+-------
+ 6
+(1 row)
+
+-- cleanup
+DELETE FROM agload_security."Person1";
+DELETE FROM agload_security."SecEdge";
+--
+-- Test 4: Constraint checking (CHECK constraint)
+--
+-- Add constraint on vertex properties - fail if bool property is false
+ALTER TABLE agload_security."Person1" ADD CONSTRAINT check_bool_true
+ CHECK ((properties->>'"bool"')::boolean = true);
+-- This should fail - constraint violation
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+ERROR: new row for relation "Person1" violates check constraint
"check_bool_true"
+DETAIL: Failing row contains (844424930131970, {"id": "2", "bool": "false",
"__id__": 2, "string": "John", "num...).
+-- Add constraint on edge properties - fail if bool property is false
+ALTER TABLE agload_security."SecEdge" ADD CONSTRAINT check_bool_true
+ CHECK ((properties->>'"bool"')::boolean = true);
+-- This should fail - some edges have bool = false
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+ERROR: new row for relation "SecEdge" violates check constraint
"check_bool_true"
+DETAIL: Failing row contains (1407374883553294, 844424930131969,
1125899906842625, {"bool": "false", "string": "John", "numeric": "-2"}).
+-- cleanup
+ALTER TABLE agload_security."Person1" DROP CONSTRAINT check_bool_true;
+ALTER TABLE agload_security."SecEdge" DROP CONSTRAINT check_bool_true;
+--
+-- Cleanup
+--
+REVOKE ALL ON agload_security."Person1" FROM load_test_user;
+REVOKE ALL ON agload_security."SecEdge" FROM load_test_user;
+REVOKE ALL ON SEQUENCE agload_security."Person1_id_seq" FROM load_test_user;
+REVOKE ALL ON SEQUENCE agload_security."SecEdge_id_seq" FROM load_test_user;
+REVOKE ALL ON ag_catalog.ag_label FROM load_test_user;
+REVOKE ALL ON ag_catalog.ag_graph FROM load_test_user;
+REVOKE ALL ON SCHEMA agload_security FROM load_test_user;
+REVOKE ALL ON SCHEMA ag_catalog FROM load_test_user;
+REVOKE pg_read_server_files FROM load_test_user;
+DROP USER load_test_user;
+SELECT drop_graph('agload_security', true);
+NOTICE: drop cascades to 5 other objects
+DETAIL: drop cascades to table agload_security._ag_label_vertex
+drop cascades to table agload_security._ag_label_edge
+drop cascades to table agload_security."Person1"
+drop cascades to table agload_security."Person2"
+drop cascades to table agload_security."SecEdge"
+NOTICE: graph "agload_security" has been dropped
+ drop_graph
+------------
+
+(1 row)
+
--
-- End
--
diff --git a/regress/expected/index.out b/regress/expected/index.out
index 745cab26..ec62bf57 100644
--- a/regress/expected/index.out
+++ b/regress/expected/index.out
@@ -264,19 +264,19 @@ $$) as (n agtype);
(0 rows)
-- Verify that the incices are created on id columns
-SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index';
+SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index'
ORDER BY 1;
indexname |
indexdef
-----------------------------+------------------------------------------------------------------------------------------------
+ City_pkey | CREATE UNIQUE INDEX "City_pkey" ON
cypher_index."City" USING btree (id)
+ Country_pkey | CREATE UNIQUE INDEX "Country_pkey" ON
cypher_index."Country" USING btree (id)
+ _ag_label_edge_end_id_idx | CREATE INDEX _ag_label_edge_end_id_idx ON
cypher_index._ag_label_edge USING btree (end_id)
_ag_label_edge_pkey | CREATE UNIQUE INDEX _ag_label_edge_pkey ON
cypher_index._ag_label_edge USING btree (id)
_ag_label_edge_start_id_idx | CREATE INDEX _ag_label_edge_start_id_idx ON
cypher_index._ag_label_edge USING btree (start_id)
- _ag_label_edge_end_id_idx | CREATE INDEX _ag_label_edge_end_id_idx ON
cypher_index._ag_label_edge USING btree (end_id)
_ag_label_vertex_pkey | CREATE UNIQUE INDEX _ag_label_vertex_pkey ON
cypher_index._ag_label_vertex USING btree (id)
- idx_pkey | CREATE UNIQUE INDEX idx_pkey ON
cypher_index.idx USING btree (id)
cypher_index_idx_props_uq | CREATE UNIQUE INDEX cypher_index_idx_props_uq
ON cypher_index.idx USING btree (properties)
- Country_pkey | CREATE UNIQUE INDEX "Country_pkey" ON
cypher_index."Country" USING btree (id)
- has_city_start_id_idx | CREATE INDEX has_city_start_id_idx ON
cypher_index.has_city USING btree (start_id)
has_city_end_id_idx | CREATE INDEX has_city_end_id_idx ON
cypher_index.has_city USING btree (end_id)
- City_pkey | CREATE UNIQUE INDEX "City_pkey" ON
cypher_index."City" USING btree (id)
+ has_city_start_id_idx | CREATE INDEX has_city_start_id_idx ON
cypher_index.has_city USING btree (start_id)
+ idx_pkey | CREATE UNIQUE INDEX idx_pkey ON
cypher_index.idx USING btree (id)
(10 rows)
SET enable_mergejoin = ON;
diff --git a/regress/sql/age_load.sql b/regress/sql/age_load.sql
index cefcfb4c..976f050a 100644
--- a/regress/sql/age_load.sql
+++ b/regress/sql/age_load.sql
@@ -194,6 +194,131 @@ SELECT load_edges_from_file('agload_conversion',
'Edges1', '../../etc/passwd', t
--
SELECT drop_graph('agload_conversion', true);
+--
+-- Test security and permissions
+--
+
+SELECT create_graph('agload_security');
+SELECT create_vlabel('agload_security', 'Person1');
+SELECT create_vlabel('agload_security', 'Person2');
+SELECT create_elabel('agload_security', 'SecEdge');
+
+--
+-- Test 1: File read permission (pg_read_server_files role)
+--
+-- Create a user without pg_read_server_files role
+CREATE USER load_test_user;
+GRANT USAGE ON SCHEMA ag_catalog TO load_test_user;
+
+-- This should fail because load_test_user doesn't have pg_read_server_files
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+RESET ROLE;
+
+-- Grant pg_read_server_files and try again - should fail on table permission
now
+GRANT pg_read_server_files TO load_test_user;
+
+--
+-- Test 2: Table INSERT permission (ACL_INSERT)
+--
+-- User has file read permission but no INSERT on the label table
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+RESET ROLE;
+
+-- Grant INSERT permission and try again - should succeed
+GRANT USAGE ON SCHEMA agload_security TO load_test_user;
+GRANT INSERT ON agload_security."Person1" TO load_test_user;
+GRANT INSERT ON agload_security."SecEdge" TO load_test_user;
+GRANT UPDATE ON SEQUENCE agload_security."Person1_id_seq" TO load_test_user;
+GRANT UPDATE ON SEQUENCE agload_security."SecEdge_id_seq" TO load_test_user;
+GRANT SELECT ON ag_catalog.ag_label TO load_test_user;
+GRANT SELECT ON ag_catalog.ag_graph TO load_test_user;
+
+SET ROLE load_test_user;
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+RESET ROLE;
+
+-- Verify data was loaded
+SELECT COUNT(*) FROM agload_security."Person1";
+SELECT COUNT(*) FROM agload_security."SecEdge";
+
+-- cleanup
+DELETE FROM agload_security."Person1";
+DELETE FROM agload_security."SecEdge";
+
+--
+-- Test 3: Row-Level Security (RLS)
+--
+
+-- Enable RLS on the label tables
+ALTER TABLE agload_security."Person1" ENABLE ROW LEVEL SECURITY;
+ALTER TABLE agload_security."SecEdge" ENABLE ROW LEVEL SECURITY;
+
+-- Switch to load_test_user
+SET ROLE load_test_user;
+
+-- Loading should fail when RLS is enabled
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+
+RESET ROLE;
+
+-- Disable RLS and try again - should succeed
+ALTER TABLE agload_security."Person1" DISABLE ROW LEVEL SECURITY;
+ALTER TABLE agload_security."SecEdge" DISABLE ROW LEVEL SECURITY;
+
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+
+-- Verify data was loaded
+SELECT COUNT(*) FROM agload_security."Person1";
+SELECT COUNT(*) FROM agload_security."SecEdge";
+
+-- cleanup
+DELETE FROM agload_security."Person1";
+DELETE FROM agload_security."SecEdge";
+
+--
+-- Test 4: Constraint checking (CHECK constraint)
+--
+
+-- Add constraint on vertex properties - fail if bool property is false
+ALTER TABLE agload_security."Person1" ADD CONSTRAINT check_bool_true
+ CHECK ((properties->>'"bool"')::boolean = true);
+
+-- This should fail - constraint violation
+SELECT load_labels_from_file('agload_security', 'Person1',
'age_load/conversion_vertices.csv', true);
+
+-- Add constraint on edge properties - fail if bool property is false
+ALTER TABLE agload_security."SecEdge" ADD CONSTRAINT check_bool_true
+ CHECK ((properties->>'"bool"')::boolean = true);
+
+-- This should fail - some edges have bool = false
+SELECT load_edges_from_file('agload_security', 'SecEdge',
'age_load/conversion_edges.csv');
+
+-- cleanup
+ALTER TABLE agload_security."Person1" DROP CONSTRAINT check_bool_true;
+ALTER TABLE agload_security."SecEdge" DROP CONSTRAINT check_bool_true;
+
+--
+-- Cleanup
+--
+REVOKE ALL ON agload_security."Person1" FROM load_test_user;
+REVOKE ALL ON agload_security."SecEdge" FROM load_test_user;
+REVOKE ALL ON SEQUENCE agload_security."Person1_id_seq" FROM load_test_user;
+REVOKE ALL ON SEQUENCE agload_security."SecEdge_id_seq" FROM load_test_user;
+REVOKE ALL ON ag_catalog.ag_label FROM load_test_user;
+REVOKE ALL ON ag_catalog.ag_graph FROM load_test_user;
+REVOKE ALL ON SCHEMA agload_security FROM load_test_user;
+REVOKE ALL ON SCHEMA ag_catalog FROM load_test_user;
+REVOKE pg_read_server_files FROM load_test_user;
+DROP USER load_test_user;
+SELECT drop_graph('agload_security', true);
+
--
-- End
--
diff --git a/regress/sql/index.sql b/regress/sql/index.sql
index a6e075c7..d4a4b24a 100644
--- a/regress/sql/index.sql
+++ b/regress/sql/index.sql
@@ -165,7 +165,7 @@ SELECT * FROM cypher('cypher_index', $$
$$) as (n agtype);
-- Verify that the incices are created on id columns
-SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index';
+SELECT indexname, indexdef FROM pg_indexes WHERE schemaname= 'cypher_index'
ORDER BY 1;
SET enable_mergejoin = ON;
SET enable_hashjoin = OFF;
diff --git a/src/backend/utils/load/ag_load_edges.c
b/src/backend/utils/load/ag_load_edges.c
index 931c6e0d..c05bf335 100644
--- a/src/backend/utils/load/ag_load_edges.c
+++ b/src/backend/utils/load/ag_load_edges.c
@@ -16,50 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-
#include "postgres.h"
-#include "utils/load/ag_load_edges.h"
-#include "utils/load/csv.h"
+#include "access/heapam.h"
+#include "access/table.h"
+#include "catalog/namespace.h"
+#include "commands/copy.h"
+#include "executor/executor.h"
+#include "nodes/makefuncs.h"
+#include "parser/parse_node.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
-void edge_field_cb(void *field, size_t field_len, void *data)
-{
-
- csv_edge_reader *cr = (csv_edge_reader*)data;
- if (cr->error)
- {
- cr->error = 1;
- ereport(NOTICE,(errmsg("There is some unknown error")));
- }
-
- /* check for space to store this field */
- if (cr->cur_field == cr->alloc)
- {
- cr->alloc *= 2;
- cr->fields = repalloc_check(cr->fields, sizeof(char *) * cr->alloc);
- cr->fields_len = repalloc_check(cr->header, sizeof(size_t *) *
cr->alloc);
- if (cr->fields == NULL)
- {
- cr->error = 1;
- ereport(ERROR,
- (errmsg("field_cb: failed to reallocate %zu bytes\n",
- sizeof(char *) * cr->alloc)));
- }
- }
- cr->fields_len[cr->cur_field] = field_len;
- cr->curr_row_length += field_len;
- cr->fields[cr->cur_field] = pnstrdup((char*)field, field_len);
- cr->cur_field += 1;
-}
+#include "utils/load/ag_load_edges.h"
-/* Parser calls this function when it detects end of a row */
-void edge_row_cb(int delim __attribute__((unused)), void *data)
+/*
+ * Process a single edge row from COPY's raw fields.
+ * Edge CSV format: start_id, start_vertex_type, end_id, end_vertex_type,
[properties...]
+ */
+static void process_edge_row(char **fields, int nfields,
+ char **header, int header_count,
+ int label_id, Oid label_seq_relid,
+ Oid graph_oid, bool load_as_agtype,
+ batch_insert_state *batch_state)
{
-
- csv_edge_reader *cr = (csv_edge_reader*)data;
- batch_insert_state *batch_state = cr->batch_state;
-
- size_t i, n_fields;
int64 start_id_int;
graphid start_vertex_graph_id;
int start_vertex_type_id;
@@ -72,104 +52,92 @@ void edge_row_cb(int delim __attribute__((unused)), void
*data)
int64 entry_id;
TupleTableSlot *slot;
- n_fields = cr->cur_field;
+ char *start_vertex_type;
+ char *end_vertex_type;
+ agtype *edge_properties;
- if (cr->row == 0)
- {
- cr->header_num = cr->cur_field;
- cr->header_row_length = cr->curr_row_length;
- cr->header_len = (size_t* )palloc(sizeof(size_t *) * cr->cur_field);
- cr->header = palloc((sizeof (char*) * cr->cur_field));
+ /* Generate edge ID */
+ entry_id = nextval_internal(label_seq_relid, true);
+ edge_id = make_graphid(label_id, entry_id);
- for (i = 0; i<cr->cur_field; i++)
- {
- cr->header_len[i] = cr->fields_len[i];
- cr->header[i] = pnstrdup(cr->fields[i], cr->header_len[i]);
- }
- }
- else
- {
- entry_id = nextval_internal(cr->label_seq_relid, true);
- edge_id = make_graphid(cr->label_id, entry_id);
-
- start_id_int = strtol(cr->fields[0], NULL, 10);
- start_vertex_type_id = get_label_id(cr->fields[1], cr->graph_oid);
- end_id_int = strtol(cr->fields[2], NULL, 10);
- end_vertex_type_id = get_label_id(cr->fields[3], cr->graph_oid);
-
- start_vertex_graph_id = make_graphid(start_vertex_type_id,
start_id_int);
- end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);
-
- /* Get the appropriate slot from the batch state */
- slot = batch_state->slots[batch_state->num_tuples];
-
- /* Clear the slots contents */
- ExecClearTuple(slot);
-
- /* Fill the values in the slot */
- slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id);
- slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id);
- slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id);
- slot->tts_values[3] = AGTYPE_P_GET_DATUM(
- create_agtype_from_list_i(
- cr->header, cr->fields,
- n_fields, 4, cr->load_as_agtype));
- slot->tts_isnull[0] = false;
- slot->tts_isnull[1] = false;
- slot->tts_isnull[2] = false;
- slot->tts_isnull[3] = false;
-
- /* Make the slot as containing virtual tuple */
- ExecStoreVirtualTuple(slot);
- batch_state->num_tuples++;
-
- if (batch_state->num_tuples >= batch_state->max_tuples)
- {
- /* Insert the batch when it is full (i.e. BATCH_SIZE) */
- insert_batch(batch_state);
- batch_state->num_tuples = 0;
- }
- }
+ /* Trim whitespace from vertex type names */
+ start_vertex_type = trim_whitespace(fields[1]);
+ end_vertex_type = trim_whitespace(fields[3]);
- for (i = 0; i < n_fields; ++i)
- {
- pfree_if_not_null(cr->fields[i]);
- }
+ /* Parse start vertex info */
+ start_id_int = strtol(fields[0], NULL, 10);
+ start_vertex_type_id = get_label_id(start_vertex_type, graph_oid);
- if (cr->error)
- {
- ereport(NOTICE,(errmsg("THere is some error")));
- }
+ /* Parse end vertex info */
+ end_id_int = strtol(fields[2], NULL, 10);
+ end_vertex_type_id = get_label_id(end_vertex_type, graph_oid);
- cr->cur_field = 0;
- cr->curr_row_length = 0;
- cr->row += 1;
-}
+ /* Create graphids for start and end vertices */
+ start_vertex_graph_id = make_graphid(start_vertex_type_id, start_id_int);
+ end_vertex_graph_id = make_graphid(end_vertex_type_id, end_id_int);
-static int is_space(unsigned char c)
-{
- if (c == CSV_SPACE || c == CSV_TAB)
- {
- return 1;
- }
- else
+ /* Get the appropriate slot from the batch state */
+ slot = batch_state->slots[batch_state->num_tuples];
+
+ /* Clear the slots contents */
+ ExecClearTuple(slot);
+
+ /* Build the agtype properties */
+ edge_properties = create_agtype_from_list_i(header, fields,
+ nfields, 4, load_as_agtype);
+
+ /* Fill the values in the slot */
+ slot->tts_values[0] = GRAPHID_GET_DATUM(edge_id);
+ slot->tts_values[1] = GRAPHID_GET_DATUM(start_vertex_graph_id);
+ slot->tts_values[2] = GRAPHID_GET_DATUM(end_vertex_graph_id);
+ slot->tts_values[3] = AGTYPE_P_GET_DATUM(edge_properties);
+ slot->tts_isnull[0] = false;
+ slot->tts_isnull[1] = false;
+ slot->tts_isnull[2] = false;
+ slot->tts_isnull[3] = false;
+
+ /* Make the slot as containing virtual tuple */
+ ExecStoreVirtualTuple(slot);
+
+ batch_state->buffered_bytes += VARSIZE(edge_properties);
+ batch_state->num_tuples++;
+
+ /* Insert the batch when tuple count OR byte threshold is reached */
+ if (batch_state->num_tuples >= BATCH_SIZE ||
+ batch_state->buffered_bytes >= MAX_BUFFERED_BYTES)
{
- return 0;
+ insert_batch(batch_state);
+ batch_state->num_tuples = 0;
+ batch_state->buffered_bytes = 0;
}
}
-static int is_term(unsigned char c)
+/*
+ * Create COPY options for CSV parsing.
+ * Returns a List of DefElem nodes.
+ */
+static List *create_copy_options(void)
{
- if (c == CSV_CR || c == CSV_LF)
- {
- return 1;
- }
- else
- {
- return 0;
- }
+ List *options = NIL;
+
+ /* FORMAT csv */
+ options = lappend(options,
+ makeDefElem("format",
+ (Node *) makeString("csv"),
+ -1));
+
+ /* HEADER false - we'll read the header ourselves */
+ options = lappend(options,
+ makeDefElem("header",
+ (Node *) makeBoolean(false),
+ -1));
+
+ return options;
}
+/*
+ * Load edges from CSV file using pg's COPY infrastructure.
+ */
int create_edges_from_csv_file(char *file_path,
char *graph_name,
Oid graph_oid,
@@ -177,79 +145,133 @@ int create_edges_from_csv_file(char *file_path,
int label_id,
bool load_as_agtype)
{
+ Relation label_rel;
+ Oid label_relid;
+ CopyFromState cstate;
+ List *copy_options;
+ ParseState *pstate;
+ char **fields;
+ int nfields;
+ char **header = NULL;
+ int header_count = 0;
+ bool is_first_row = true;
+ char *label_seq_name;
+ Oid label_seq_relid;
+ batch_insert_state *batch_state = NULL;
+ MemoryContext batch_context;
+ MemoryContext old_context;
+
+ /* Create a memory context for batch processing - reset after each batch */
+ batch_context = AllocSetContextCreate(CurrentMemoryContext,
+ "AGE CSV Edge Load Batch Context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /* Get the label relation */
+ label_relid = get_label_relation(label_name, graph_oid);
+ label_rel = table_open(label_relid, RowExclusiveLock);
+
+ /* Get sequence info */
+ label_seq_name = get_label_seq_relation_name(label_name);
+ label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
+
+ /* Initialize the batch insert state */
+ init_batch_insert(&batch_state, label_name, graph_oid);
+
+ /* Create COPY options for CSV parsing */
+ copy_options = create_copy_options();
+
+ /* Create a minimal ParseState for BeginCopyFrom */
+ pstate = make_parsestate(NULL);
- FILE *fp;
- struct csv_parser p;
- char buf[1024];
- size_t bytes_read;
- unsigned char options = 0;
- csv_edge_reader cr;
- char *label_seq_name;
-
- if (csv_init(&p, options) != 0)
+ PG_TRY();
{
- ereport(ERROR,
- (errmsg("Failed to initialize csv parser\n")));
- }
-
- p.malloc_func = palloc;
- p.realloc_func = repalloc_check;
- p.free_func = pfree_if_not_null;
+ /*
+ * Initialize COPY FROM state.
+ * We pass the label relation but will only use NextCopyFromRawFields
+ * which returns raw parsed strings without type conversion.
+ */
+ cstate = BeginCopyFrom(pstate,
+ label_rel,
+ NULL, /* whereClause */
+ file_path,
+ false, /* is_program */
+ NULL, /* data_source_cb */
+ NIL, /* attnamelist */
+ copy_options);
+
+ /*
+ * Process rows using COPY's csv parsing.
+ * NextCopyFromRawFields uses 64KB buffers internally.
+ */
+ while (NextCopyFromRawFields(cstate, &fields, &nfields))
+ {
+ if (is_first_row)
+ {
+ int i;
- csv_set_space_func(&p, is_space);
- csv_set_term_func(&p, is_term);
+ /* First row is the header - save column names (in main
context) */
+ header_count = nfields;
+ header = (char **) palloc(sizeof(char *) * nfields);
- fp = fopen(file_path, "rb");
- if (!fp)
- {
- ereport(ERROR,
- (errmsg("Failed to open %s\n", file_path)));
- }
+ for (i = 0; i < nfields; i++)
+ {
+ /* Trim whitespace from header fields */
+ header[i] = trim_whitespace(fields[i]);
+ }
- PG_TRY();
- {
- label_seq_name = get_label_seq_relation_name(label_name);
-
- memset((void*)&cr, 0, sizeof(csv_edge_reader));
- cr.alloc = 128;
- cr.fields = palloc(sizeof(char *) * cr.alloc);
- cr.fields_len = palloc(sizeof(size_t *) * cr.alloc);
- cr.header_row_length = 0;
- cr.curr_row_length = 0;
- cr.graph_name = graph_name;
- cr.graph_oid = graph_oid;
- cr.label_name = label_name;
- cr.label_id = label_id;
- cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
- cr.load_as_agtype = load_as_agtype;
-
- /* Initialize the batch insert state */
- init_batch_insert(&cr.batch_state, label_name, graph_oid);
-
- while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
- {
- if (csv_parse(&p, buf, bytes_read, edge_field_cb,
- edge_row_cb, &cr) != bytes_read)
+ is_first_row = false;
+ }
+ else
{
- ereport(ERROR, (errmsg("Error while parsing file: %s\n",
- csv_strerror(csv_error(&p)))));
+ /* Switch to batch context for row processing */
+ old_context = MemoryContextSwitchTo(batch_context);
+
+ /* Data row - process it */
+ process_edge_row(fields, nfields,
+ header, header_count,
+ label_id, label_seq_relid,
+ graph_oid, load_as_agtype,
+ batch_state);
+
+ /* Switch back to main context */
+ MemoryContextSwitchTo(old_context);
+
+ /* Reset batch context after each batch to free memory */
+ if (batch_state->num_tuples == 0)
+ {
+ MemoryContextReset(batch_context);
+ }
}
}
- csv_fini(&p, edge_field_cb, edge_row_cb, &cr);
-
/* Finish any remaining batch inserts */
- finish_batch_insert(&cr.batch_state);
+ finish_batch_insert(&batch_state);
+ MemoryContextReset(batch_context);
- if (ferror(fp))
- {
- ereport(ERROR, (errmsg("Error while reading file %s\n",
file_path)));
- }
+ /* Clean up COPY state */
+ EndCopyFrom(cstate);
}
PG_FINALLY();
{
- fclose(fp);
- csv_free(&p);
+ /* Free header if allocated */
+ if (header != NULL)
+ {
+ int i;
+ for (i = 0; i < header_count; i++)
+ {
+ pfree(header[i]);
+ }
+ pfree(header);
+ }
+
+ /* Close the relation */
+ table_close(label_rel, RowExclusiveLock);
+
+ /* Delete batch context */
+ MemoryContextDelete(batch_context);
+
+ /* Free parse state */
+ free_parsestate(pstate);
}
PG_END_TRY();
diff --git a/src/backend/utils/load/ag_load_labels.c
b/src/backend/utils/load/ag_load_labels.c
index 1e86bbda..5b11f68b 100644
--- a/src/backend/utils/load/ag_load_labels.c
+++ b/src/backend/utils/load/ag_load_labels.c
@@ -17,155 +17,114 @@
* under the License.
*/
#include "postgres.h"
-#include "executor/spi.h"
+
+#include "access/heapam.h"
+#include "access/table.h"
#include "catalog/namespace.h"
+#include "commands/copy.h"
#include "executor/executor.h"
+#include "nodes/makefuncs.h"
+#include "parser/parse_node.h"
+#include "utils/memutils.h"
+#include "utils/rel.h"
#include "utils/load/ag_load_labels.h"
-#include "utils/load/csv.h"
-
-void vertex_field_cb(void *field, size_t field_len, void *data)
-{
-
- csv_vertex_reader *cr = (csv_vertex_reader *) data;
-
- if (cr->error)
- {
- cr->error = 1;
- ereport(NOTICE,(errmsg("There is some unknown error")));
- }
-
- /* check for space to store this field */
- if (cr->cur_field == cr->alloc)
- {
- cr->alloc *= 2;
- cr->fields = repalloc_check(cr->fields, sizeof(char *) * cr->alloc);
- cr->fields_len = repalloc_check(cr->header, sizeof(size_t *) *
cr->alloc);
- if (cr->fields == NULL)
- {
- cr->error = 1;
- ereport(ERROR,
- (errmsg("field_cb: failed to reallocate %zu bytes\n",
- sizeof(char *) * cr->alloc)));
- }
- }
- cr->fields_len[cr->cur_field] = field_len;
- cr->curr_row_length += field_len;
- cr->fields[cr->cur_field] = pnstrdup((char *) field, field_len);
- cr->cur_field += 1;
-}
-void vertex_row_cb(int delim __attribute__((unused)), void *data)
+/*
+ * Process a single vertex row from COPY's raw fields.
+ * Vertex CSV format: [id,] [properties...]
+ */
+static void process_vertex_row(char **fields, int nfields,
+ char **header, int header_count,
+ int label_id, Oid label_seq_relid,
+ bool id_field_exists, bool load_as_agtype,
+ int64 *curr_seq_num,
+ batch_insert_state *batch_state)
{
- csv_vertex_reader *cr = (csv_vertex_reader*)data;
- batch_insert_state *batch_state = cr->batch_state;
- size_t i, n_fields;
graphid vertex_id;
int64 entry_id;
TupleTableSlot *slot;
+ agtype *vertex_properties;
- n_fields = cr->cur_field;
-
- if (cr->row == 0)
+ /* Generate or use provided entry_id */
+ if (id_field_exists)
{
- cr->header_num = cr->cur_field;
- cr->header_row_length = cr->curr_row_length;
- cr->header_len = (size_t* )palloc(sizeof(size_t *) * cr->cur_field);
- cr->header = palloc((sizeof (char*) * cr->cur_field));
-
- for (i = 0; i<cr->cur_field; i++)
+ entry_id = strtol(fields[0], NULL, 10);
+ if (entry_id > *curr_seq_num)
{
- cr->header_len[i] = cr->fields_len[i];
- cr->header[i] = pnstrdup(cr->fields[i], cr->header_len[i]);
+ /* This is needed to ensure the sequence is up-to-date */
+ DirectFunctionCall2(setval_oid,
+ ObjectIdGetDatum(label_seq_relid),
+ Int64GetDatum(entry_id));
+ *curr_seq_num = entry_id;
}
}
else
{
- if (cr->id_field_exists)
- {
- entry_id = strtol(cr->fields[0], NULL, 10);
- if (entry_id > cr->curr_seq_num)
- {
- DirectFunctionCall2(setval_oid,
- ObjectIdGetDatum(cr->label_seq_relid),
- Int64GetDatum(entry_id));
- cr->curr_seq_num = entry_id;
- }
- }
- else
- {
- entry_id = nextval_internal(cr->label_seq_relid, true);
- }
+ entry_id = nextval_internal(label_seq_relid, true);
+ }
- vertex_id = make_graphid(cr->label_id, entry_id);
+ vertex_id = make_graphid(label_id, entry_id);
- /* Get the appropriate slot from the batch state */
- slot = batch_state->slots[batch_state->num_tuples];
+ /* Get the appropriate slot from the batch state */
+ slot = batch_state->slots[batch_state->num_tuples];
- /* Clear the slots contents */
- ExecClearTuple(slot);
+ /* Clear the slots contents */
+ ExecClearTuple(slot);
- /* Fill the values in the slot */
- slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
- slot->tts_values[1] = AGTYPE_P_GET_DATUM(
- create_agtype_from_list(cr->header, cr->fields,
- n_fields, entry_id,
- cr->load_as_agtype));
- slot->tts_isnull[0] = false;
- slot->tts_isnull[1] = false;
+ /* Build the agtype properties */
+ vertex_properties = create_agtype_from_list(header, fields,
+ nfields, entry_id,
+ load_as_agtype);
- /* Make the slot as containing virtual tuple */
- ExecStoreVirtualTuple(slot);
+ /* Fill the values in the slot */
+ slot->tts_values[0] = GRAPHID_GET_DATUM(vertex_id);
+ slot->tts_values[1] = AGTYPE_P_GET_DATUM(vertex_properties);
+ slot->tts_isnull[0] = false;
+ slot->tts_isnull[1] = false;
- batch_state->num_tuples++;
+ /* Make the slot as containing virtual tuple */
+ ExecStoreVirtualTuple(slot);
- if (batch_state->num_tuples >= batch_state->max_tuples)
- {
- /* Insert the batch when it is full (i.e. BATCH_SIZE) */
- insert_batch(batch_state);
- batch_state->num_tuples = 0;
- }
- }
+ batch_state->buffered_bytes += VARSIZE(vertex_properties);
+ batch_state->num_tuples++;
- for (i = 0; i < n_fields; ++i)
+ /* Insert the batch when tuple count OR byte threshold is reached */
+ if (batch_state->num_tuples >= BATCH_SIZE ||
+ batch_state->buffered_bytes >= MAX_BUFFERED_BYTES)
{
- pfree_if_not_null(cr->fields[i]);
+ insert_batch(batch_state);
+ batch_state->num_tuples = 0;
+ batch_state->buffered_bytes = 0;
}
-
- if (cr->error)
- {
- ereport(NOTICE,(errmsg("THere is some error")));
- }
-
- cr->cur_field = 0;
- cr->curr_row_length = 0;
- cr->row += 1;
}
-static int is_space(unsigned char c)
+/*
+ * Create COPY options for csv parsing.
+ * Returns a List of DefElem nodes.
+ */
+static List *create_copy_options(void)
{
- if (c == CSV_SPACE || c == CSV_TAB)
- {
- return 1;
- }
- else
- {
- return 0;
- }
+ List *options = NIL;
-}
-static int is_term(unsigned char c)
-{
- if (c == CSV_CR || c == CSV_LF)
- {
- return 1;
- }
- else
- {
- return 0;
- }
+ /* FORMAT csv */
+ options = lappend(options,
+ makeDefElem("format",
+ (Node *) makeString("csv"),
+ -1));
+
+ /* HEADER false - we'll read the header ourselves */
+ options = lappend(options,
+ makeDefElem("header",
+ (Node *) makeBoolean(false),
+ -1));
+
+ return options;
}
+/*
+ * Load vertex labels from csv file using pg's COPY infrastructure.
+ */
int create_labels_from_csv_file(char *file_path,
char *graph_name,
Oid graph_oid,
@@ -174,96 +133,146 @@ int create_labels_from_csv_file(char *file_path,
bool id_field_exists,
bool load_as_agtype)
{
-
- FILE *fp;
- struct csv_parser p;
- char buf[1024];
- size_t bytes_read;
- unsigned char options = 0;
- csv_vertex_reader cr;
- char *label_seq_name;
-
- if (csv_init(&p, options) != 0)
+ Relation label_rel;
+ Oid label_relid;
+ CopyFromState cstate;
+ List *copy_options;
+ ParseState *pstate;
+ char **fields;
+ int nfields;
+ char **header = NULL;
+ int header_count = 0;
+ bool is_first_row = true;
+ char *label_seq_name;
+ Oid label_seq_relid;
+ int64 curr_seq_num = 0;
+ batch_insert_state *batch_state = NULL;
+ MemoryContext batch_context;
+ MemoryContext old_context;
+
+ /* Create a memory context for batch processing - reset after each batch */
+ batch_context = AllocSetContextCreate(CurrentMemoryContext,
+ "AGE CSV Load Batch Context",
+ ALLOCSET_DEFAULT_SIZES);
+
+ /* Get the label relation */
+ label_relid = get_label_relation(label_name, graph_oid);
+ label_rel = table_open(label_relid, RowExclusiveLock);
+
+ /* Get sequence info */
+ label_seq_name = get_label_seq_relation_name(label_name);
+ label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
+
+ if (id_field_exists)
{
- ereport(ERROR,
- (errmsg("Failed to initialize csv parser\n")));
+ /*
+ * Set the curr_seq_num since we will need it to compare with
+ * incoming entry_id.
+ */
+ curr_seq_num = nextval_internal(label_seq_relid, true);
}
- p.malloc_func = palloc;
- p.realloc_func = repalloc_check;
- p.free_func = pfree_if_not_null;
+ /* Initialize the batch insert state */
+ init_batch_insert(&batch_state, label_name, graph_oid);
- csv_set_space_func(&p, is_space);
- csv_set_term_func(&p, is_term);
+ /* Create COPY options for CSV parsing */
+ copy_options = create_copy_options();
- fp = fopen(file_path, "rb");
- if (!fp)
- {
- ereport(ERROR,
- (errmsg("Failed to open %s\n", file_path)));
- }
+ /* Create a minimal ParseState for BeginCopyFrom */
+ pstate = make_parsestate(NULL);
PG_TRY();
{
- label_seq_name = get_label_seq_relation_name(label_name);
-
- memset((void*)&cr, 0, sizeof(csv_vertex_reader));
-
- cr.alloc = 2048;
- cr.fields = palloc(sizeof(char *) * cr.alloc);
- cr.fields_len = palloc(sizeof(size_t *) * cr.alloc);
- cr.header_row_length = 0;
- cr.curr_row_length = 0;
- cr.graph_name = graph_name;
- cr.graph_oid = graph_oid;
- cr.label_name = label_name;
- cr.label_id = label_id;
- cr.id_field_exists = id_field_exists;
- cr.label_seq_relid = get_relname_relid(label_seq_name, graph_oid);
- cr.load_as_agtype = load_as_agtype;
-
- if (cr.id_field_exists)
+ /*
+ * Initialize COPY FROM state.
+ * We pass the label relation but will only use NextCopyFromRawFields
+ * which returns raw parsed strings without type conversion.
+ */
+ cstate = BeginCopyFrom(pstate,
+ label_rel,
+ NULL, /* whereClause */
+ file_path,
+ false, /* is_program */
+ NULL, /* data_source_cb */
+ NIL, /* attnamelist - NULL means all
columns */
+ copy_options);
+
+ /*
+ * Process rows using COPY's csv parsing.
+ * NextCopyFromRawFields uses 64KB buffers internally.
+ */
+ while (NextCopyFromRawFields(cstate, &fields, &nfields))
{
- /*
- * Set the curr_seq_num since we will need it to compare with
- * incoming entry_id.
- *
- * We cant use currval because it will error out if nextval was
- * not called before in the session.
- */
- cr.curr_seq_num = nextval_internal(cr.label_seq_relid, true);
- }
+ if (is_first_row)
+ {
+ int i;
- /* Initialize the batch insert state */
- init_batch_insert(&cr.batch_state, label_name, graph_oid);
+ /* First row is the header - save column names (in main
context) */
+ header_count = nfields;
+ header = (char **) palloc(sizeof(char *) * nfields);
- while ((bytes_read=fread(buf, 1, 1024, fp)) > 0)
- {
- if (csv_parse(&p, buf, bytes_read, vertex_field_cb,
- vertex_row_cb, &cr) != bytes_read)
+ for (i = 0; i < nfields; i++)
+ {
+ /* Trim whitespace from header fields */
+ header[i] = trim_whitespace(fields[i]);
+ }
+
+ is_first_row = false;
+ }
+ else
{
- ereport(ERROR, (errmsg("Error while parsing file: %s\n",
- csv_strerror(csv_error(&p)))));
+ /* Switch to batch context for row processing */
+ old_context = MemoryContextSwitchTo(batch_context);
+
+ /* Data row - process it */
+ process_vertex_row(fields, nfields,
+ header, header_count,
+ label_id, label_seq_relid,
+ id_field_exists, load_as_agtype,
+ &curr_seq_num,
+ batch_state);
+
+ /* Switch back to main context */
+ MemoryContextSwitchTo(old_context);
+
+ /* Reset batch context after each batch to free memory */
+ if (batch_state->num_tuples == 0)
+ {
+ MemoryContextReset(batch_context);
+ }
}
}
- csv_fini(&p, vertex_field_cb, vertex_row_cb, &cr);
-
/* Finish any remaining batch inserts */
- finish_batch_insert(&cr.batch_state);
+ finish_batch_insert(&batch_state);
+ MemoryContextReset(batch_context);
- if (ferror(fp))
- {
- ereport(ERROR, (errmsg("Error while reading file %s\n",
- file_path)));
- }
+ /* Clean up COPY state */
+ EndCopyFrom(cstate);
}
PG_FINALLY();
{
- fclose(fp);
- csv_free(&p);
+ /* Free header if allocated */
+ if (header != NULL)
+ {
+ int i;
+ for (i = 0; i < header_count; i++)
+ {
+ pfree(header[i]);
+ }
+ pfree(header);
+ }
+
+ /* Close the relation */
+ table_close(label_rel, RowExclusiveLock);
+
+ /* Delete batch context */
+ MemoryContextDelete(batch_context);
+
+ /* Free parse state */
+ free_parsestate(pstate);
}
PG_END_TRY();
return EXIT_SUCCESS;
-}
\ No newline at end of file
+}
diff --git a/src/backend/utils/load/age_load.c
b/src/backend/utils/load/age_load.c
index c7cf0677..f9634668 100644
--- a/src/backend/utils/load/age_load.c
+++ b/src/backend/utils/load/age_load.c
@@ -18,24 +18,81 @@
*/
#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/table.h"
+#include "access/tableam.h"
+#include "access/xact.h"
#include "catalog/indexing.h"
+#include "catalog/pg_authid.h"
#include "executor/executor.h"
+#include "miscadmin.h"
+#include "nodes/parsenodes.h"
+#include "parser/parse_relation.h"
+#include "utils/acl.h"
#include "utils/json.h"
+#include "utils/rel.h"
+#include "utils/rls.h"
#include "utils/load/ag_load_edges.h"
#include "utils/load/ag_load_labels.h"
#include "utils/load/age_load.h"
-#include "utils/rel.h"
static agtype_value *csv_value_to_agtype_value(char *csv_val);
static Oid get_or_create_graph(const Name graph_name);
static int32 get_or_create_label(Oid graph_oid, char *graph_name,
char *label_name, char label_kind);
static char *build_safe_filename(char *name);
+static void check_file_read_permission(void);
+static void check_table_permissions(Oid relid);
+static void check_rls_for_load(Oid relid);
#define AGE_BASE_CSV_DIRECTORY "/tmp/age/"
#define AGE_CSV_FILE_EXTENSION ".csv"
+/*
+ * Trim leading and trailing whitespace from a string.
+ * Returns a newly allocated string with whitespace removed.
+ * Returns empty string for NULL input.
+ */
+char *trim_whitespace(const char *str)
+{
+ const char *start;
+ const char *end;
+ size_t len;
+
+ if (str == NULL)
+ {
+ return pstrdup("");
+ }
+
+ /* Find first non-whitespace character */
+ start = str;
+ while (*start && (*start == ' ' || *start == '\t' ||
+ *start == '\n' || *start == '\r'))
+ {
+ start++;
+ }
+
+ /* If string is all whitespace, return empty string */
+ if (*start == '\0')
+ {
+ return pstrdup("");
+ }
+
+ /* Find last non-whitespace character */
+ end = str + strlen(str) - 1;
+ while (end > start && (*end == ' ' || *end == '\t' ||
+ *end == '\n' || *end == '\r'))
+ {
+ end--;
+ }
+
+ /* Copy the trimmed string */
+ len = end - start + 1;
+ return pnstrdup(start, len);
+}
+
static char *build_safe_filename(char *name)
{
int length;
@@ -88,6 +145,51 @@ static char *build_safe_filename(char *name)
return resolved;
}
+/*
+ * Check if the current user has permission to read server files.
+ * Only users with the pg_read_server_files role can load from files.
+ */
+static void check_file_read_permission(void)
+{
+ if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("permission denied to LOAD from a file"),
+ errdetail("Only roles with privileges of the \"%s\" role may
LOAD from a file.",
+ "pg_read_server_files")));
+ }
+}
+
+/*
+ * Check if the current user has INSERT permission on the target table.
+ */
+static void check_table_permissions(Oid relid)
+{
+ AclResult aclresult;
+
+ aclresult = pg_class_aclcheck(relid, GetUserId(), ACL_INSERT);
+ if (aclresult != ACLCHECK_OK)
+ {
+ aclcheck_error(aclresult, OBJECT_TABLE, get_rel_name(relid));
+ }
+}
+
+/*
+ * Check if RLS is enabled on the target table.
+ * CSV loading is not supported with row-level security.
+ */
+static void check_rls_for_load(Oid relid)
+{
+ if (check_enable_rls(relid, InvalidOid, true) == RLS_ENABLED)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("LOAD from file is not supported with row-level
security"),
+ errhint("Use Cypher CREATE clause instead.")));
+ }
+}
+
agtype *create_empty_agtype(void)
{
agtype* out;
@@ -118,6 +220,14 @@ static agtype_value *csv_value_to_agtype_value(char
*csv_val)
char *new_csv_val;
agtype_value *res;
+ /* Handle NULL or empty input - return null agtype value */
+ if (csv_val == NULL || csv_val[0] == '\0')
+ {
+ res = palloc(sizeof(agtype_value));
+ res->type = AGTV_NULL;
+ return res;
+ }
+
if (!json_validate(cstring_to_text(csv_val), false, false))
{
/* wrap the string with double-quote */
@@ -175,18 +285,40 @@ agtype *create_agtype_from_list(char **header, char
**fields, size_t fields_len,
for (i = 0; i<fields_len; i++)
{
+ char *trimmed_value;
+
+ /* Skip empty header fields (e.g., from trailing commas) */
+ if (header[i] == NULL || header[i][0] == '\0')
+ {
+ continue;
+ }
+
key_agtype = string_to_agtype_value(header[i]);
result.res = push_agtype_value(&result.parse_state,
WAGT_KEY,
key_agtype);
+ /* Trim whitespace from field value */
+ trimmed_value = trim_whitespace(fields[i]);
+
if (load_as_agtype)
{
- value_agtype = csv_value_to_agtype_value(fields[i]);
+ value_agtype = csv_value_to_agtype_value(trimmed_value);
}
else
{
- value_agtype = string_to_agtype_value(fields[i]);
+ /* Handle empty field values */
+ if (trimmed_value[0] == '\0')
+ {
+ value_agtype = palloc(sizeof(agtype_value));
+ value_agtype->type = AGTV_STRING;
+ value_agtype->val.string.len = 0;
+ value_agtype->val.string.val = pstrdup("");
+ }
+ else
+ {
+ value_agtype = string_to_agtype_value(trimmed_value);
+ }
}
result.res = push_agtype_value(&result.parse_state,
@@ -228,18 +360,40 @@ agtype* create_agtype_from_list_i(char **header, char
**fields,
for (i = start_index; i < fields_len; i++)
{
+ char *trimmed_value;
+
+ /* Skip empty header fields (e.g., from trailing commas) */
+ if (header[i] == NULL || header[i][0] == '\0')
+ {
+ continue;
+ }
+
key_agtype = string_to_agtype_value(header[i]);
result.res = push_agtype_value(&result.parse_state,
WAGT_KEY,
key_agtype);
+ /* Trim whitespace from field value */
+ trimmed_value = trim_whitespace(fields[i]);
+
if (load_as_agtype)
{
- value_agtype = csv_value_to_agtype_value(fields[i]);
+ value_agtype = csv_value_to_agtype_value(trimmed_value);
}
else
{
- value_agtype = string_to_agtype_value(fields[i]);
+ /* Handle empty field values */
+ if (trimmed_value[0] == '\0')
+ {
+ value_agtype = palloc(sizeof(agtype_value));
+ value_agtype->type = AGTV_STRING;
+ value_agtype->val.string.len = 0;
+ value_agtype->val.string.val = pstrdup("");
+ }
+ else
+ {
+ value_agtype = string_to_agtype_value(trimmed_value);
+ }
}
result.res = push_agtype_value(&result.parse_state,
@@ -362,11 +516,24 @@ void insert_batch(batch_insert_state *batch_state)
List *result;
int i;
+ /* Check constraints for each tuple before inserting */
+ if (batch_state->resultRelInfo->ri_RelationDesc->rd_att->constr)
+ {
+ for (i = 0; i < batch_state->num_tuples; i++)
+ {
+ ExecConstraints(batch_state->resultRelInfo,
+ batch_state->slots[i],
+ batch_state->estate);
+ }
+ }
+
/* Insert the tuples */
heap_multi_insert(batch_state->resultRelInfo->ri_RelationDesc,
batch_state->slots, batch_state->num_tuples,
- GetCurrentCommandId(true), 0, NULL);
-
+ GetCurrentCommandId(true),
+ TABLE_INSERT_SKIP_FSM, /* Skip free space map for bulk
*/
+ batch_state->bistate); /* Use bulk insert state */
+
/* Insert index entries for the tuples */
if (batch_state->resultRelInfo->ri_NumIndices > 0)
{
@@ -405,6 +572,7 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
char* label_name_str;
char* file_path_str;
Oid graph_oid;
+ Oid label_relid;
int32 label_id;
bool id_field_exists;
bool load_as_agtype;
@@ -427,6 +595,9 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
errmsg("file path must not be NULL")));
}
+ /* Check file read permission first */
+ check_file_read_permission();
+
graph_name = PG_GETARG_NAME(0);
label_name = PG_GETARG_NAME(1);
file_name = PG_GETARG_TEXT_P(2);
@@ -447,6 +618,11 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
label_id = get_or_create_label(graph_oid, graph_name_str,
label_name_str, LABEL_KIND_VERTEX);
+ /* Get the label relation and check permissions */
+ label_relid = get_label_relation(label_name_str, graph_oid);
+ check_table_permissions(label_relid);
+ check_rls_for_load(label_relid);
+
create_labels_from_csv_file(file_path_str, graph_name_str, graph_oid,
label_name_str, label_id, id_field_exists,
load_as_agtype);
@@ -459,7 +635,6 @@ Datum load_labels_from_file(PG_FUNCTION_ARGS)
PG_FUNCTION_INFO_V1(load_edges_from_file);
Datum load_edges_from_file(PG_FUNCTION_ARGS)
{
-
Name graph_name;
Name label_name;
text* file_name;
@@ -467,6 +642,7 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
char* label_name_str;
char* file_path_str;
Oid graph_oid;
+ Oid label_relid;
int32 label_id;
bool load_as_agtype;
@@ -488,6 +664,9 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
errmsg("file path must not be NULL")));
}
+ /* Check file read permission first */
+ check_file_read_permission();
+
graph_name = PG_GETARG_NAME(0);
label_name = PG_GETARG_NAME(1);
file_name = PG_GETARG_TEXT_P(2);
@@ -507,6 +686,11 @@ Datum load_edges_from_file(PG_FUNCTION_ARGS)
label_id = get_or_create_label(graph_oid, graph_name_str,
label_name_str, LABEL_KIND_EDGE);
+ /* Get the label relation and check permissions */
+ label_relid = get_label_relation(label_name_str, graph_oid);
+ check_table_permissions(label_relid);
+ check_rls_for_load(label_relid);
+
create_edges_from_csv_file(file_path_str, graph_name_str, graph_oid,
label_name_str, label_id, load_as_agtype);
@@ -597,19 +781,42 @@ void init_batch_insert(batch_insert_state **batch_state,
Oid relid;
EState *estate;
ResultRelInfo *resultRelInfo;
+ RangeTblEntry *rte;
+ RTEPermissionInfo *perminfo;
+ List *range_table = NIL;
+ List *perminfos = NIL;
int i;
- /* Open the relation */
+ /* Get the relation OID */
relid = get_label_relation(label_name, graph_oid);
- relation = table_open(relid, RowExclusiveLock);
/* Initialize executor state */
estate = CreateExecutorState();
- /* Initialize resultRelInfo */
+ /* Create range table entry for ExecConstraints */
+ rte = makeNode(RangeTblEntry);
+ rte->rtekind = RTE_RELATION;
+ rte->relid = relid;
+ rte->relkind = RELKIND_RELATION;
+ rte->rellockmode = RowExclusiveLock;
+ rte->perminfoindex = 1;
+ range_table = list_make1(rte);
+
+ /* Create permission info */
+ perminfo = makeNode(RTEPermissionInfo);
+ perminfo->relid = relid;
+ perminfo->requiredPerms = ACL_INSERT;
+ perminfos = list_make1(perminfo);
+
+ /* Initialize range table in executor state */
+ ExecInitRangeTable(estate, range_table, perminfos);
+
+ /* Initialize resultRelInfo - this opens the relation */
resultRelInfo = makeNode(ResultRelInfo);
- InitResultRelInfo(resultRelInfo, relation, 1, NULL, estate->es_instrument);
- estate->es_result_relations = &resultRelInfo;
+ ExecInitResultRelation(estate, resultRelInfo, 1);
+
+ /* Get relation from resultRelInfo (opened by ExecInitResultRelation) */
+ relation = resultRelInfo->ri_RelationDesc;
/* Open the indices */
ExecOpenIndices(resultRelInfo, false);
@@ -619,8 +826,9 @@ void init_batch_insert(batch_insert_state **batch_state,
(*batch_state)->slots = palloc(sizeof(TupleTableSlot *) * BATCH_SIZE);
(*batch_state)->estate = estate;
(*batch_state)->resultRelInfo = resultRelInfo;
- (*batch_state)->max_tuples = BATCH_SIZE;
(*batch_state)->num_tuples = 0;
+ (*batch_state)->buffered_bytes = 0;
+ (*batch_state)->bistate = GetBulkInsertState();
/* Create slots */
for (i = 0; i < BATCH_SIZE; i++)
@@ -651,12 +859,14 @@ void finish_batch_insert(batch_insert_state **batch_state)
ExecDropSingleTupleTableSlot((*batch_state)->slots[i]);
}
- /* Clean up, close the indices and relation */
- ExecCloseIndices((*batch_state)->resultRelInfo);
- table_close((*batch_state)->resultRelInfo->ri_RelationDesc,
- RowExclusiveLock);
+ /* Free BulkInsertState */
+ FreeBulkInsertState((*batch_state)->bistate);
+
+ /* Close result relations and range table relations */
+ ExecCloseResultRelations((*batch_state)->estate);
+ ExecCloseRangeTableRelations((*batch_state)->estate);
- /* Clean up batch state */
+ /* Clean up executor state */
FreeExecutorState((*batch_state)->estate);
pfree((*batch_state)->slots);
pfree(*batch_state);
diff --git a/src/backend/utils/load/libcsv.c b/src/backend/utils/load/libcsv.c
deleted file mode 100644
index f0e8b46b..00000000
--- a/src/backend/utils/load/libcsv.c
+++ /dev/null
@@ -1,549 +0,0 @@
-/*
-libcsv - parse and write csv data
-Copyright (C) 2008 Robert Gamble
-
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-*/
-
-#include <assert.h>
-
-#if __STDC_VERSION__ >= 199901L
-# include <stdint.h>
-#else
- /* C89 doesn't have stdint.h or SIZE_MAX */
-# define SIZE_MAX ((size_t)-1)
-#endif
-
-#include "utils/load/csv.h"
-
-#define VERSION "3.0.3"
-
-#define ROW_NOT_BEGUN 0
-#define FIELD_NOT_BEGUN 1
-#define FIELD_BEGUN 2
-#define FIELD_MIGHT_HAVE_ENDED 3
-
-/*
- Explanation of states
- ROW_NOT_BEGUN There have not been any fields encountered for this row
- FIELD_NOT_BEGUN There have been fields but we are currently not in one
- FIELD_BEGUN We are in a field
- FIELD_MIGHT_HAVE_ENDED
- We encountered a double quote inside a quoted field, the
- field is either ended or the quote is literal
-*/
-
-#define MEM_BLK_SIZE 128
-
-#define SUBMIT_FIELD(p) \
- do { \
- if (!quoted) \
- entry_pos -= spaces; \
- if (p->options & CSV_APPEND_NULL) \
- ((p)->entry_buf[entry_pos]) = '\0'; \
- if (cb1 && (p->options & CSV_EMPTY_IS_NULL) && !quoted && entry_pos == 0) \
- cb1(NULL, entry_pos, data); \
- else if (cb1) \
- cb1(p->entry_buf, entry_pos, data); \
- pstate = FIELD_NOT_BEGUN; \
- entry_pos = quoted = spaces = 0; \
- } while (0)
-
-#define SUBMIT_ROW(p, c) \
- do { \
- if (cb2) \
- cb2(c, data); \
- pstate = ROW_NOT_BEGUN; \
- entry_pos = quoted = spaces = 0; \
- } while (0)
-
-#define SUBMIT_CHAR(p, c) ((p)->entry_buf[entry_pos++] = (c))
-
-static const char *csv_errors[] = {"success",
- "error parsing data while strict checking
enabled",
- "memory exhausted while increasing buffer size",
- "data size too large",
- "invalid status code"};
-
-int
-csv_error(const struct csv_parser *p)
-{
- assert(p && "received null csv_parser");
-
- /* Return the current status of the parser */
- return p->status;
-}
-
-const char *
-csv_strerror(int status)
-{
- /* Return a textual description of status */
- if (status >= CSV_EINVALID || status < 0)
- return csv_errors[CSV_EINVALID];
- else
- return csv_errors[status];
-}
-
-int
-csv_get_opts(const struct csv_parser *p)
-{
- /* Return the currently set options of parser */
- if (p == NULL)
- return -1;
-
- return p->options;
-}
-
-int
-csv_set_opts(struct csv_parser *p, unsigned char options)
-{
- /* Set the options */
- if (p == NULL)
- return -1;
-
- p->options = options;
- return 0;
-}
-
-int
-csv_init(struct csv_parser *p, unsigned char options)
-{
- /* Initialize a csv_parser object returns 0 on success, -1 on error */
- if (p == NULL)
- return -1;
-
- p->entry_buf = NULL;
- p->pstate = ROW_NOT_BEGUN;
- p->quoted = 0;
- p->spaces = 0;
- p->entry_pos = 0;
- p->entry_size = 0;
- p->status = 0;
- p->options = options;
- p->quote_char = CSV_QUOTE;
- p->delim_char = CSV_COMMA;
- p->is_space = NULL;
- p->is_term = NULL;
- p->blk_size = MEM_BLK_SIZE;
- p->malloc_func = NULL;
- p->realloc_func = realloc;
- p->free_func = free;
-
- return 0;
-}
-
-void
-csv_free(struct csv_parser *p)
-{
- /* Free the entry_buffer of csv_parser object */
- if (p == NULL)
- return;
-
- if (p->entry_buf && p->free_func)
- p->free_func(p->entry_buf);
-
- p->entry_buf = NULL;
- p->entry_size = 0;
-
- return;
-}
-
-int
-csv_fini(struct csv_parser *p, void (*cb1)(void *, size_t, void *), void
(*cb2)(int c, void *), void *data)
-{
- int quoted;
- int pstate;
- size_t spaces;
- size_t entry_pos;
-
- if (p == NULL)
- return -1;
-
- /* Finalize parsing. Needed, for example, when file does not end in a
newline */
- quoted = p->quoted;
- pstate = p->pstate;
- spaces = p->spaces;
- entry_pos = p->entry_pos;
-
- if ((pstate == FIELD_BEGUN) && p->quoted && (p->options & CSV_STRICT) &&
(p->options & CSV_STRICT_FINI)) {
- /* Current field is quoted, no end-quote was seen, and CSV_STRICT_FINI is
set */
- p->status = CSV_EPARSE;
- return -1;
- }
-
- switch (pstate) {
- case FIELD_MIGHT_HAVE_ENDED:
- p->entry_pos -= p->spaces + 1; /* get rid of spaces and original quote
*/
- entry_pos = p->entry_pos;
- /*lint -fallthrough */
- case FIELD_NOT_BEGUN:
- case FIELD_BEGUN:
- /* Unnecessary:
- quoted = p->quoted, pstate = p->pstate;
- spaces = p->spaces, entry_pos = p->entry_pos;
- */
- SUBMIT_FIELD(p);
- SUBMIT_ROW(p, -1);
- break;
- case ROW_NOT_BEGUN: /* Already ended properly */
- ;
- }
-
- /* Reset parser */
- p->spaces = p->quoted = p->entry_pos = p->status = 0;
- p->pstate = ROW_NOT_BEGUN;
-
- return 0;
-}
-
-void
-csv_set_delim(struct csv_parser *p, unsigned char c)
-{
- /* Set the delimiter */
- if (p) p->delim_char = c;
-}
-
-void
-csv_set_quote(struct csv_parser *p, unsigned char c)
-{
- /* Set the quote character */
- if (p) p->quote_char = c;
-}
-
-unsigned char
-csv_get_delim(const struct csv_parser *p)
-{
- assert(p && "received null csv_parser");
-
- /* Get the delimiter */
- return p->delim_char;
-}
-
-unsigned char
-csv_get_quote(const struct csv_parser *p)
-{
- assert(p && "received null csv_parser");
-
- /* Get the quote character */
- return p->quote_char;
-}
-
-void
-csv_set_space_func(struct csv_parser *p, int (*f)(unsigned char))
-{
- /* Set the space function */
- if (p) p->is_space = f;
-}
-
-void
-csv_set_term_func(struct csv_parser *p, int (*f)(unsigned char))
-{
- /* Set the term function */
- if (p) p->is_term = f;
-}
-
-void
-csv_set_realloc_func(struct csv_parser *p, void *(*f)(void *, size_t))
-{
- /* Set the realloc function used to increase buffer size */
- if (p && f) p->realloc_func = f;
-}
-
-void
-csv_set_free_func(struct csv_parser *p, void (*f)(void *))
-{
- /* Set the free function used to free the buffer */
- if (p && f) p->free_func = f;
-}
-
-void
-csv_set_blk_size(struct csv_parser *p, size_t size)
-{
- /* Set the block size used to increment buffer size */
- if (p) p->blk_size = size;
-}
-
-size_t
-csv_get_buffer_size(const struct csv_parser *p)
-{
- /* Get the size of the entry buffer */
- if (p)
- return p->entry_size;
- return 0;
-}
-
-static int
-csv_increase_buffer(struct csv_parser *p)
-{
- size_t to_add;
- void *vp;
-
- if (p == NULL) return 0;
- if (p->realloc_func == NULL) return 0;
-
- /* Increase the size of the entry buffer. Attempt to increase size by
- * p->blk_size, if this is larger than SIZE_MAX try to increase current
- * buffer size to SIZE_MAX. If allocation fails, try to allocate halve
- * the size and try again until successful or increment size is zero.
- */
-
- to_add = p->blk_size;
-
- if ( p->entry_size >= SIZE_MAX - to_add )
- to_add = SIZE_MAX - p->entry_size;
-
- if (!to_add) {
- p->status = CSV_ETOOBIG;
- return -1;
- }
-
- while ((vp = p->realloc_func(p->entry_buf, p->entry_size + to_add)) == NULL)
{
- to_add /= 2;
- if (!to_add) {
- p->status = CSV_ENOMEM;
- return -1;
- }
- }
-
- /* Update entry buffer pointer and entry_size if successful */
- p->entry_buf = vp;
- p->entry_size += to_add;
- return 0;
-}
-
-size_t
-csv_parse(struct csv_parser *p, const void *s, size_t len, void (*cb1)(void *,
size_t, void *), void (*cb2)(int c, void *), void *data)
-{
- unsigned const char *us = s; /* Access input data as array of unsigned char
*/
- unsigned char c; /* The character we are currently processing */
- size_t pos = 0; /* The number of characters we have processed
in this call */
-
- /* Store key fields into local variables for performance */
- unsigned char delim = p->delim_char;
- unsigned char quote = p->quote_char;
- int (*is_space)(unsigned char) = p->is_space;
- int (*is_term)(unsigned char) = p->is_term;
- int quoted = p->quoted;
- int pstate = p->pstate;
- size_t spaces = p->spaces;
- size_t entry_pos = p->entry_pos;
-
-
- if (!p->entry_buf && pos < len) {
- /* Buffer hasn't been allocated yet and len > 0 */
- if (csv_increase_buffer(p) != 0) {
- p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos
= entry_pos;
- return pos;
- }
- }
-
- while (pos < len) {
- /* Check memory usage, increase buffer if necessary */
- if (entry_pos == ((p->options & CSV_APPEND_NULL) ? p->entry_size - 1 :
p->entry_size) ) {
- if (csv_increase_buffer(p) != 0) {
- p->quoted = quoted, p->pstate = pstate, p->spaces = spaces,
p->entry_pos = entry_pos;
- return pos;
- }
- }
-
- c = us[pos++];
-
- switch (pstate) {
- case ROW_NOT_BEGUN:
- case FIELD_NOT_BEGUN:
- if ((is_space ? is_space(c) : c == CSV_SPACE || c == CSV_TAB) &&
c!=delim) { /* Space or Tab */
- continue;
- } else if (is_term ? is_term(c) : c == CSV_CR || c == CSV_LF) { /*
Carriage Return or Line Feed */
- if (pstate == FIELD_NOT_BEGUN) {
- SUBMIT_FIELD(p);
- SUBMIT_ROW(p, c);
- } else { /* ROW_NOT_BEGUN */
- /* Don't submit empty rows by default */
- if (p->options & CSV_REPALL_NL) {
- SUBMIT_ROW(p, c);
- }
- }
- continue;
- } else if (c == delim) { /* Comma */
- SUBMIT_FIELD(p);
- break;
- } else if (c == quote) { /* Quote */
- pstate = FIELD_BEGUN;
- quoted = 1;
- } else { /* Anything else */
- pstate = FIELD_BEGUN;
- quoted = 0;
- SUBMIT_CHAR(p, c);
- }
- break;
- case FIELD_BEGUN:
- if (c == quote) { /* Quote */
- if (quoted) {
- SUBMIT_CHAR(p, c);
- pstate = FIELD_MIGHT_HAVE_ENDED;
- } else {
- /* STRICT ERROR - double quote inside non-quoted field */
- if (p->options & CSV_STRICT) {
- p->status = CSV_EPARSE;
- p->quoted = quoted, p->pstate = pstate, p->spaces = spaces,
p->entry_pos = entry_pos;
- return pos-1;
- }
- SUBMIT_CHAR(p, c);
- spaces = 0;
- }
- } else if (c == delim) { /* Comma */
- if (quoted) {
- SUBMIT_CHAR(p, c);
- } else {
- SUBMIT_FIELD(p);
- }
- } else if (is_term ? is_term(c) : c == CSV_CR || c == CSV_LF) { /*
Carriage Return or Line Feed */
- if (!quoted) {
- SUBMIT_FIELD(p);
- SUBMIT_ROW(p, c);
- } else {
- SUBMIT_CHAR(p, c);
- }
- } else if (!quoted && (is_space? is_space(c) : c == CSV_SPACE || c ==
CSV_TAB)) { /* Tab or space for non-quoted field */
- SUBMIT_CHAR(p, c);
- spaces++;
- } else { /* Anything else */
- SUBMIT_CHAR(p, c);
- spaces = 0;
- }
- break;
- case FIELD_MIGHT_HAVE_ENDED:
- /* This only happens when a quote character is encountered in a quoted
field */
- if (c == delim) { /* Comma */
- entry_pos -= spaces + 1; /* get rid of spaces and original quote */
- SUBMIT_FIELD(p);
- } else if (is_term ? is_term(c) : c == CSV_CR || c == CSV_LF) { /*
Carriage Return or Line Feed */
- entry_pos -= spaces + 1; /* get rid of spaces and original quote */
- SUBMIT_FIELD(p);
- SUBMIT_ROW(p, c);
- } else if (is_space ? is_space(c) : c == CSV_SPACE || c == CSV_TAB) {
/* Space or Tab */
- SUBMIT_CHAR(p, c);
- spaces++;
- } else if (c == quote) { /* Quote */
- if (spaces) {
- /* STRICT ERROR - unescaped double quote */
- if (p->options & CSV_STRICT) {
- p->status = CSV_EPARSE;
- p->quoted = quoted, p->pstate = pstate, p->spaces = spaces,
p->entry_pos = entry_pos;
- return pos-1;
- }
- spaces = 0;
- SUBMIT_CHAR(p, c);
- } else {
- /* Two quotes in a row */
- pstate = FIELD_BEGUN;
- }
- } else { /* Anything else */
- /* STRICT ERROR - unescaped double quote */
- if (p->options & CSV_STRICT) {
- p->status = CSV_EPARSE;
- p->quoted = quoted, p->pstate = pstate, p->spaces = spaces,
p->entry_pos = entry_pos;
- return pos-1;
- }
- pstate = FIELD_BEGUN;
- spaces = 0;
- SUBMIT_CHAR(p, c);
- }
- break;
- default:
- break;
- }
- }
- p->quoted = quoted, p->pstate = pstate, p->spaces = spaces, p->entry_pos =
entry_pos;
- return pos;
-}
-
-size_t
-csv_write (void *dest, size_t dest_size, const void *src, size_t src_size)
-{
- return csv_write2(dest, dest_size, src, src_size, CSV_QUOTE);
-}
-
-int
-csv_fwrite (FILE *fp, const void *src, size_t src_size)
-{
- return csv_fwrite2(fp, src, src_size, CSV_QUOTE);
-}
-
-size_t
-csv_write2 (void *dest, size_t dest_size, const void *src, size_t src_size,
unsigned char quote)
-{
- unsigned char *cdest = dest;
- const unsigned char *csrc = src;
- size_t chars = 0;
-
- if (src == NULL)
- return 0;
-
- if (dest == NULL)
- dest_size = 0;
-
- if (dest_size > 0)
- *cdest++ = quote;
- chars++;
-
- while (src_size) {
- if (*csrc == quote) {
- if (dest_size > chars)
- *cdest++ = quote;
- if (chars < SIZE_MAX) chars++;
- }
- if (dest_size > chars)
- *cdest++ = *csrc;
- if (chars < SIZE_MAX) chars++;
- src_size--;
- csrc++;
- }
-
- if (dest_size > chars)
- *cdest = quote;
- if (chars < SIZE_MAX) chars++;
-
- return chars;
-}
-
-int
-csv_fwrite2 (FILE *fp, const void *src, size_t src_size, unsigned char quote)
-{
- const unsigned char *csrc = src;
-
- if (fp == NULL || src == NULL)
- return 0;
-
- if (fputc(quote, fp) == EOF)
- return EOF;
-
- while (src_size) {
- if (*csrc == quote) {
- if (fputc(quote, fp) == EOF)
- return EOF;
- }
- if (fputc(*csrc, fp) == EOF)
- return EOF;
- src_size--;
- csrc++;
- }
-
- if (fputc(quote, fp) == EOF) {
- return EOF;
- }
-
- return 0;
-}
diff --git a/src/include/utils/load/ag_load_edges.h
b/src/include/utils/load/ag_load_edges.h
index df663b1d..4db00d93 100644
--- a/src/include/utils/load/ag_load_edges.h
+++ b/src/include/utils/load/ag_load_edges.h
@@ -17,42 +17,28 @@
* under the License.
*/
-#include "access/heapam.h"
-#include "utils/load/age_load.h"
-
#ifndef AG_LOAD_EDGES_H
#define AG_LOAD_EDGES_H
-typedef struct {
- size_t row;
- char **header;
- size_t *header_len;
- size_t header_num;
- char **fields;
- size_t *fields_len;
- size_t alloc;
- size_t cur_field;
- int error;
- size_t header_row_length;
- size_t curr_row_length;
- char *graph_name;
- Oid graph_oid;
- char *label_name;
- int label_id;
- Oid label_seq_relid;
- char *start_vertex;
- char *end_vertex;
- bool load_as_agtype;
- batch_insert_state *batch_state;
-} csv_edge_reader;
-
-
-void edge_field_cb(void *field, size_t field_len, void *data);
-void edge_row_cb(int delim __attribute__((unused)), void *data);
+#include "utils/load/age_load.h"
+/*
+ * Load edges from a CSV file using pg's COPY infrastructure.
+ *
+ * CSV format: start_id, start_vertex_type, end_id, end_vertex_type,
[properties...]
+ *
+ * Parameters:
+ * file_path - Path to the CSV file (must be in /tmp/age/)
+ * graph_name - Name of the graph
+ * graph_oid - OID of the graph
+ * label_name - Name of the edge label
+ * label_id - ID of the label
+ * load_as_agtype - If true, parse CSV values as agtype (JSON-like)
+ *
+ * Returns EXIT_SUCCESS on success.
+ */
int create_edges_from_csv_file(char *file_path, char *graph_name, Oid
graph_oid,
- char *label_name, int label_id,
- bool load_as_agtype);
-
-#endif /*AG_LOAD_EDGES_H */
+ char *label_name, int label_id,
+ bool load_as_agtype);
+#endif /* AG_LOAD_EDGES_H */
diff --git a/src/include/utils/load/ag_load_labels.h
b/src/include/utils/load/ag_load_labels.h
index b8ed1572..c3d517f3 100644
--- a/src/include/utils/load/ag_load_labels.h
+++ b/src/include/utils/load/ag_load_labels.h
@@ -17,46 +17,26 @@
* under the License.
*/
-
#ifndef AG_LOAD_LABELS_H
#define AG_LOAD_LABELS_H
-#include "access/heapam.h"
#include "utils/load/age_load.h"
-struct counts {
- long unsigned fields;
- long unsigned allvalues;
- long unsigned rows;
-};
-
-typedef struct {
- size_t row;
- char **header;
- size_t *header_len;
- size_t header_num;
- char **fields;
- size_t *fields_len;
- size_t alloc;
- size_t cur_field;
- int error;
- size_t header_row_length;
- size_t curr_row_length;
- char *graph_name;
- Oid graph_oid;
- char *label_name;
- int label_id;
- Oid label_seq_relid;
- bool id_field_exists;
- bool load_as_agtype;
- int curr_seq_num;
- batch_insert_state *batch_state;
-} csv_vertex_reader;
-
-
-void vertex_field_cb(void *field, size_t field_len, void *data);
-void vertex_row_cb(int delim __attribute__((unused)), void *data);
-
+/*
+ * Load vertex labels from a CSV file using pg's COPY infrastructure.
+ * CSV format: [id,] [properties...]
+ *
+ * Parameters:
+ * file_path - Path to the CSV file (must be in /tmp/age/)
+ * graph_name - Name of the graph
+ * graph_oid - OID of the graph
+ * label_name - Name of the vertex label
+ * label_id - ID of the label
+ * id_field_exists - If true, first CSV column contains the vertex ID
+ * load_as_agtype - If true, parse CSV values as agtype (JSON-like)
+ *
+ * Returns EXIT_SUCCESS on success.
+ */
int create_labels_from_csv_file(char *file_path, char *graph_name, Oid
graph_oid,
char *label_name, int label_id,
bool id_field_exists, bool load_as_agtype);
diff --git a/src/include/utils/load/age_load.h
b/src/include/utils/load/age_load.h
index 72f11493..6573c79f 100644
--- a/src/include/utils/load/age_load.h
+++ b/src/include/utils/load/age_load.h
@@ -17,6 +17,10 @@
* under the License.
*/
+#ifndef AG_LOAD_H
+#define AG_LOAD_H
+
+#include "access/heapam.h"
#include "commands/sequence.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -27,10 +31,8 @@
#include "commands/graph_commands.h"
#include "utils/ag_cache.h"
-#ifndef AGE_ENTITY_CREATOR_H
-#define AGE_ENTITY_CREATOR_H
-
#define BATCH_SIZE 1000
+#define MAX_BUFFERED_BYTES 65535 /* 64KB, same as pg COPY */
typedef struct batch_insert_state
{
@@ -38,26 +40,29 @@ typedef struct batch_insert_state
ResultRelInfo *resultRelInfo;
TupleTableSlot **slots;
int num_tuples;
- int max_tuples;
+ size_t buffered_bytes;
+ BulkInsertState bistate;
} batch_insert_state;
-agtype* create_empty_agtype(void);
-
-agtype* create_agtype_from_list(char **header, char **fields,
+agtype *create_empty_agtype(void);
+agtype *create_agtype_from_list(char **header, char **fields,
size_t fields_len, int64 vertex_id,
bool load_as_agtype);
-agtype* create_agtype_from_list_i(char **header, char **fields,
+agtype *create_agtype_from_list_i(char **header, char **fields,
size_t fields_len, size_t start_index,
bool load_as_agtype);
+
void insert_vertex_simple(Oid graph_oid, char *label_name, graphid vertex_id,
agtype *vertex_properties);
void insert_edge_simple(Oid graph_oid, char *label_name, graphid edge_id,
graphid start_id, graphid end_id,
- agtype* end_properties);
-void insert_batch(batch_insert_state *batch_state);
+ agtype *edge_properties);
void init_batch_insert(batch_insert_state **batch_state,
char *label_name, Oid graph_oid);
+void insert_batch(batch_insert_state *batch_state);
void finish_batch_insert(batch_insert_state **batch_state);
-#endif /* AGE_ENTITY_CREATOR_H */
+char *trim_whitespace(const char *str);
+
+#endif /* AG_LOAD_H */
diff --git a/src/include/utils/load/csv.h b/src/include/utils/load/csv.h
deleted file mode 100644
index 06253697..00000000
--- a/src/include/utils/load/csv.h
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Created by Shoaib on 12/5/2021.
-*/
-
-/*
-libcsv - parse and write csv data
-Copyright (C) 2008-2021 Robert Gamble
-This library is free software; you can redistribute it and/or
-modify it under the terms of the GNU Lesser General Public
-License as published by the Free Software Foundation; either
-version 2.1 of the License, or (at your option) any later version.
-This library is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-Lesser General Public License for more details.
-You should have received a copy of the GNU Lesser General Public
-License along with this library; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-*/
-
-#ifndef LIBCSV_H__
-#define LIBCSV_H__
-#include <stdlib.h>
-#include <stdio.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#define CSV_MAJOR 3
-#define CSV_MINOR 0
-#define CSV_RELEASE 3
-
-/* Error Codes */
-#define CSV_SUCCESS 0
-#define CSV_EPARSE 1 /* Parse error in strict mode */
-#define CSV_ENOMEM 2 /* Out of memory while increasing buffer size */
-#define CSV_ETOOBIG 3 /* Buffer larger than SIZE_MAX needed */
-#define CSV_EINVALID 4 /* Invalid code,should never be received from
csv_error*/
-
-
-/* parser options */
-#define CSV_STRICT 1 /* enable strict mode */
-#define CSV_REPALL_NL 2 /* report all unquoted carriage returns and linefeeds
*/
-#define CSV_STRICT_FINI 4 /* causes csv_fini to return CSV_EPARSE if last
- field is quoted and doesn't contain ending
- quote */
-#define CSV_APPEND_NULL 8 /* Ensure that all fields are null-terminated */
-#define CSV_EMPTY_IS_NULL 16 /* Pass null pointer to cb1 function when
- empty, unquoted fields are encountered */
-
-
-/* Character values */
-#define CSV_TAB 0x09
-#define CSV_SPACE 0x20
-#define CSV_CR 0x0d
-#define CSV_LF 0x0a
-#define CSV_COMMA 0x2c
-#define CSV_QUOTE 0x22
-
-struct csv_parser {
- int pstate; /* Parser state */
- int quoted; /* Is the current field a quoted field? */
- size_t spaces; /* Number of continuous spaces after quote or in a
non-quoted field */
- unsigned char * entry_buf; /* Entry buffer */
- size_t entry_pos; /* Current position in entry_buf (and current size of
entry) */
- size_t entry_size; /* Size of entry buffer */
- int status; /* Operation status */
- unsigned char options;
- unsigned char quote_char;
- unsigned char delim_char;
- int (*is_space)(unsigned char);
- int (*is_term)(unsigned char);
- size_t blk_size;
- void *(*malloc_func)(size_t); /* not used */
- void *(*realloc_func)(void *, size_t); /* function used to allocate
buffer memory */
- void (*free_func)(void *); /* function used to free buffer
memory */
-};
-
-/* Function Prototypes */
-int csv_init(struct csv_parser *p, unsigned char options);
-int csv_fini(struct csv_parser *p, void (*cb1)(void *, size_t, void *), void
(*cb2)(int, void *), void *data);
-void csv_free(struct csv_parser *p);
-int csv_error(const struct csv_parser *p);
-const char * csv_strerror(int error);
-size_t csv_parse(struct csv_parser *p, const void *s, size_t len, void
(*cb1)(void *, size_t, void *), void (*cb2)(int, void *), void *data);
-size_t csv_write(void *dest, size_t dest_size, const void *src, size_t
src_size);
-int csv_fwrite(FILE *fp, const void *src, size_t src_size);
-size_t csv_write2(void *dest, size_t dest_size, const void *src, size_t
src_size, unsigned char quote);
-int csv_fwrite2(FILE *fp, const void *src, size_t src_size, unsigned char
quote);
-int csv_get_opts(const struct csv_parser *p);
-int csv_set_opts(struct csv_parser *p, unsigned char options);
-void csv_set_delim(struct csv_parser *p, unsigned char c);
-void csv_set_quote(struct csv_parser *p, unsigned char c);
-unsigned char csv_get_delim(const struct csv_parser *p);
-unsigned char csv_get_quote(const struct csv_parser *p);
-void csv_set_space_func(struct csv_parser *p, int (*f)(unsigned char));
-void csv_set_term_func(struct csv_parser *p, int (*f)(unsigned char));
-void csv_set_realloc_func(struct csv_parser *p, void *(*)(void *, size_t));
-void csv_set_free_func(struct csv_parser *p, void (*)(void *));
-void csv_set_blk_size(struct csv_parser *p, size_t);
-size_t csv_get_buffer_size(const struct csv_parser *p);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif