This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit db0818f586ef4974a4d2a328130cf104fed08c3a Author: NJrslv <[email protected]> AuthorDate: Thu Sep 4 13:26:16 2025 +0300 [yagp_hooks_collector] Add regression tests, ANALYZE text output, and UTF-8 trimming Add PG-style regression tests. Enable sending EXPLAIN ANALYZE as text. Add utility statement hook coverage. Implement UTF-8 safe trimming: discard partial multi-byte characters at cut boundaries. Clean up stray gmon.out. --- expected/yagp_cursors.out | 165 +++++++++++++ expected/yagp_dist.out | 177 ++++++++++++++ expected/yagp_select.out | 138 +++++++++++ expected/yagp_utf8_trim.out | 66 +++++ expected/yagp_utility.out | 272 +++++++++++++++++++++ metric.md | 49 ++-- sql/yagp_cursors.sql | 83 +++++++ sql/yagp_dist.sql | 86 +++++++ sql/yagp_select.sql | 67 +++++ sql/yagp_utf8_trim.sql | 43 ++++ sql/yagp_utility.sql | 133 ++++++++++ src/Config.cpp | 36 ++- src/Config.h | 5 + src/EventSender.cpp | 191 +++++++++------ src/EventSender.h | 18 +- src/PgUtils.cpp | 5 - src/PgUtils.h | 3 - src/ProtoUtils.cpp | 69 ++++-- src/ProtoUtils.h | 5 +- src/UDSConnector.cpp | 3 +- src/UDSConnector.h | 4 +- src/hook_wrappers.cpp | 60 ++++- src/hook_wrappers.h | 3 + src/log/LogOps.cpp | 131 ++++++++++ src/log/LogOps.h | 19 ++ src/log/LogSchema.cpp | 135 ++++++++++ src/log/LogSchema.h | 166 +++++++++++++ src/memory/gpdbwrappers.cpp | 13 +- src/memory/gpdbwrappers.h | 8 +- src/yagp_hooks_collector.c | 14 +- yagp_hooks_collector--1.0--1.1.sql | 113 +++++++++ ...ector--1.0.sql => yagp_hooks_collector--1.0.sql | 2 +- yagp_hooks_collector--1.1.sql | 95 +++++++ yagp_hooks_collector.control | 2 +- 34 files changed, 2224 insertions(+), 155 deletions(-) diff --git a/expected/yagp_cursors.out b/expected/yagp_cursors.out new file mode 100644 index 00000000000..9587c00b550 --- /dev/null +++ b/expected/yagp_cursors.out @@ -0,0 +1,165 @@ +CREATE EXTENSION yagp_hooks_collector; +CREATE FUNCTION yagp_status_order(status text) +RETURNS integer +AS $$ +BEGIN + RETURN CASE status + WHEN 'QUERY_STATUS_SUBMIT' THEN 1 + WHEN 'QUERY_STATUS_START' THEN 2 + WHEN 'QUERY_STATUS_END' THEN 3 + WHEN 'QUERY_STATUS_DONE' THEN 4 + ELSE 999 + END; +END; +$$ LANGUAGE plpgsql IMMUTABLE; +SET yagpcc.enable TO TRUE; +SET yagpcc.enable_utility TO TRUE; +SET yagpcc.report_nested_queries TO TRUE; +-- DECLARE +SET yagpcc.logging_mode to 'TBL'; +BEGIN; +DECLARE cursor_stats_0 CURSOR FOR SELECT 0; +CLOSE cursor_stats_0; +COMMIT; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+---------------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | BEGIN; | QUERY_STATUS_SUBMIT + -1 | BEGIN; | QUERY_STATUS_DONE + -1 | DECLARE cursor_stats_0 CURSOR FOR SELECT 0; | QUERY_STATUS_SUBMIT + -1 | DECLARE cursor_stats_0 CURSOR FOR SELECT 0; | QUERY_STATUS_DONE + -1 | CLOSE cursor_stats_0; | QUERY_STATUS_SUBMIT + -1 | CLOSE cursor_stats_0; | QUERY_STATUS_DONE + -1 | COMMIT; | QUERY_STATUS_SUBMIT + -1 | COMMIT; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(10 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- DECLARE WITH HOLD +SET yagpcc.logging_mode to 'TBL'; +BEGIN; +DECLARE cursor_stats_1 CURSOR WITH HOLD FOR SELECT 1; +CLOSE cursor_stats_1; +DECLARE cursor_stats_2 CURSOR WITH HOLD FOR SELECT 2; +CLOSE cursor_stats_2; +COMMIT; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+-------------------------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | BEGIN; | QUERY_STATUS_SUBMIT + -1 | BEGIN; | QUERY_STATUS_DONE + -1 | DECLARE cursor_stats_1 CURSOR WITH HOLD FOR SELECT 1; | QUERY_STATUS_SUBMIT + -1 | DECLARE cursor_stats_1 CURSOR WITH HOLD FOR SELECT 1; | QUERY_STATUS_DONE + -1 | CLOSE cursor_stats_1; | QUERY_STATUS_SUBMIT + -1 | CLOSE cursor_stats_1; | QUERY_STATUS_DONE + -1 | DECLARE cursor_stats_2 CURSOR WITH HOLD FOR SELECT 2; | QUERY_STATUS_SUBMIT + -1 | DECLARE cursor_stats_2 CURSOR WITH HOLD FOR SELECT 2; | QUERY_STATUS_DONE + -1 | CLOSE cursor_stats_2; | QUERY_STATUS_SUBMIT + -1 | CLOSE cursor_stats_2; | QUERY_STATUS_DONE + -1 | COMMIT; | QUERY_STATUS_SUBMIT + -1 | COMMIT; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(14 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- ROLLBACK +SET yagpcc.logging_mode to 'TBL'; +BEGIN; +DECLARE cursor_stats_3 CURSOR FOR SELECT 1; +CLOSE cursor_stats_3; +DECLARE cursor_stats_4 CURSOR FOR SELECT 1; +ROLLBACK; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+---------------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | BEGIN; | QUERY_STATUS_SUBMIT + -1 | BEGIN; | QUERY_STATUS_DONE + -1 | DECLARE cursor_stats_3 CURSOR FOR SELECT 1; | QUERY_STATUS_SUBMIT + -1 | DECLARE cursor_stats_3 CURSOR FOR SELECT 1; | QUERY_STATUS_DONE + -1 | CLOSE cursor_stats_3; | QUERY_STATUS_SUBMIT + -1 | CLOSE cursor_stats_3; | QUERY_STATUS_DONE + -1 | DECLARE cursor_stats_4 CURSOR FOR SELECT 1; | QUERY_STATUS_SUBMIT + -1 | DECLARE cursor_stats_4 CURSOR FOR SELECT 1; | QUERY_STATUS_DONE + -1 | ROLLBACK; | QUERY_STATUS_SUBMIT + -1 | ROLLBACK; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(12 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- FETCH +SET yagpcc.logging_mode to 'TBL'; +BEGIN; +DECLARE cursor_stats_5 CURSOR WITH HOLD FOR SELECT 2; +DECLARE cursor_stats_6 CURSOR WITH HOLD FOR SELECT 3; +FETCH 1 IN cursor_stats_5; + ?column? +---------- + 2 +(1 row) + +FETCH 1 IN cursor_stats_6; + ?column? +---------- + 3 +(1 row) + +CLOSE cursor_stats_5; +CLOSE cursor_stats_6; +COMMIT; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+-------------------------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | BEGIN; | QUERY_STATUS_SUBMIT + -1 | BEGIN; | QUERY_STATUS_DONE + -1 | DECLARE cursor_stats_5 CURSOR WITH HOLD FOR SELECT 2; | QUERY_STATUS_SUBMIT + -1 | DECLARE cursor_stats_5 CURSOR WITH HOLD FOR SELECT 2; | QUERY_STATUS_DONE + -1 | DECLARE cursor_stats_6 CURSOR WITH HOLD FOR SELECT 3; | QUERY_STATUS_SUBMIT + -1 | DECLARE cursor_stats_6 CURSOR WITH HOLD FOR SELECT 3; | QUERY_STATUS_DONE + -1 | FETCH 1 IN cursor_stats_5; | QUERY_STATUS_SUBMIT + -1 | FETCH 1 IN cursor_stats_5; | QUERY_STATUS_DONE + -1 | FETCH 1 IN cursor_stats_6; | QUERY_STATUS_SUBMIT + -1 | FETCH 1 IN cursor_stats_6; | QUERY_STATUS_DONE + -1 | CLOSE cursor_stats_5; | QUERY_STATUS_SUBMIT + -1 | CLOSE cursor_stats_5; | QUERY_STATUS_DONE + -1 | CLOSE cursor_stats_6; | QUERY_STATUS_SUBMIT + -1 | CLOSE cursor_stats_6; | QUERY_STATUS_DONE + -1 | COMMIT; | QUERY_STATUS_SUBMIT + -1 | COMMIT; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(18 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +DROP FUNCTION yagp_status_order(text); +DROP EXTENSION yagp_hooks_collector; +RESET yagpcc.enable; +RESET yagpcc.report_nested_queries; +RESET yagpcc.enable_utility; diff --git a/expected/yagp_dist.out b/expected/yagp_dist.out new file mode 100644 index 00000000000..ebaf839601d --- /dev/null +++ b/expected/yagp_dist.out @@ -0,0 +1,177 @@ +CREATE EXTENSION yagp_hooks_collector; +CREATE OR REPLACE FUNCTION yagp_status_order(status text) +RETURNS integer +AS $$ +BEGIN + RETURN CASE status + WHEN 'QUERY_STATUS_SUBMIT' THEN 1 + WHEN 'QUERY_STATUS_START' THEN 2 + WHEN 'QUERY_STATUS_END' THEN 3 + WHEN 'QUERY_STATUS_DONE' THEN 4 + ELSE 999 + END; +END; +$$ LANGUAGE plpgsql IMMUTABLE; +SET yagpcc.enable TO TRUE; +SET yagpcc.report_nested_queries TO TRUE; +SET yagpcc.enable_utility TO FALSE; +-- Hash distributed table +CREATE TABLE test_hash_dist (id int) DISTRIBUTED BY (id); +INSERT INTO test_hash_dist SELECT 1; +SET yagpcc.logging_mode to 'TBL'; +SET optimizer_enable_direct_dispatch TO TRUE; +-- Direct dispatch is used here, only one segment is scanned. +select * from test_hash_dist where id = 1; + id +---- + 1 +(1 row) + +RESET optimizer_enable_direct_dispatch; +RESET yagpcc.logging_mode; +-- Should see 8 rows. +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+--------------------------------------------+--------------------- + -1 | select * from test_hash_dist where id = 1; | QUERY_STATUS_SUBMIT + -1 | select * from test_hash_dist where id = 1; | QUERY_STATUS_START + -1 | select * from test_hash_dist where id = 1; | QUERY_STATUS_END + -1 | select * from test_hash_dist where id = 1; | QUERY_STATUS_DONE + 1 | | QUERY_STATUS_SUBMIT + 1 | | QUERY_STATUS_START + 1 | | QUERY_STATUS_END + 1 | | QUERY_STATUS_DONE +(8 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +SET yagpcc.logging_mode to 'TBL'; +-- Scan all segments. +select * from test_hash_dist; + id +---- + 1 +(1 row) + +DROP TABLE test_hash_dist; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+-------------------------------+--------------------- + -1 | select * from test_hash_dist; | QUERY_STATUS_SUBMIT + -1 | select * from test_hash_dist; | QUERY_STATUS_START + -1 | select * from test_hash_dist; | QUERY_STATUS_END + -1 | select * from test_hash_dist; | QUERY_STATUS_DONE + 1 | | QUERY_STATUS_SUBMIT + 1 | | QUERY_STATUS_START + 1 | | QUERY_STATUS_END + 1 | | QUERY_STATUS_DONE + 2 | | QUERY_STATUS_SUBMIT + 2 | | QUERY_STATUS_START + 2 | | QUERY_STATUS_END + 2 | | QUERY_STATUS_DONE + | | QUERY_STATUS_SUBMIT + | | QUERY_STATUS_START + | | QUERY_STATUS_END + | | QUERY_STATUS_DONE +(16 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- Replicated table +CREATE FUNCTION force_segments() RETURNS SETOF text AS $$ +BEGIN + RETURN NEXT 'seg'; +END; +$$ LANGUAGE plpgsql VOLATILE EXECUTE ON ALL SEGMENTS; +CREATE TABLE test_replicated (id int) DISTRIBUTED REPLICATED; +INSERT INTO test_replicated SELECT 1; +SET yagpcc.logging_mode to 'TBL'; +SELECT COUNT(*) FROM test_replicated, force_segments(); + count +------- + 3 +(1 row) + +DROP TABLE test_replicated; +DROP FUNCTION force_segments(); +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+---------------------------------------------------------+--------------------- + -1 | SELECT COUNT(*) FROM test_replicated, force_segments(); | QUERY_STATUS_SUBMIT + -1 | SELECT COUNT(*) FROM test_replicated, force_segments(); | QUERY_STATUS_START + -1 | SELECT COUNT(*) FROM test_replicated, force_segments(); | QUERY_STATUS_END + -1 | SELECT COUNT(*) FROM test_replicated, force_segments(); | QUERY_STATUS_DONE + 1 | | QUERY_STATUS_SUBMIT + 1 | | QUERY_STATUS_START + 1 | | QUERY_STATUS_END + 1 | | QUERY_STATUS_DONE + 2 | | QUERY_STATUS_SUBMIT + 2 | | QUERY_STATUS_START + 2 | | QUERY_STATUS_END + 2 | | QUERY_STATUS_DONE + | | QUERY_STATUS_SUBMIT + | | QUERY_STATUS_START + | | QUERY_STATUS_END + | | QUERY_STATUS_DONE +(16 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- Partially distributed table (2 numsegments) +SET allow_system_table_mods = ON; +CREATE TABLE test_partial_dist (id int, data text) DISTRIBUTED BY (id); +UPDATE gp_distribution_policy SET numsegments = 2 WHERE localoid = 'test_partial_dist'::regclass; +INSERT INTO test_partial_dist SELECT * FROM generate_series(1, 100); +SET yagpcc.logging_mode to 'TBL'; +SELECT COUNT(*) FROM test_partial_dist; + count +------- + 100 +(1 row) + +RESET yagpcc.logging_mode; +DROP TABLE test_partial_dist; +RESET allow_system_table_mods; +-- Should see 12 rows. +SELECT query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + query_text | query_status +-----------------------------------------+--------------------- + SELECT COUNT(*) FROM test_partial_dist; | QUERY_STATUS_SUBMIT + SELECT COUNT(*) FROM test_partial_dist; | QUERY_STATUS_START + SELECT COUNT(*) FROM test_partial_dist; | QUERY_STATUS_END + SELECT COUNT(*) FROM test_partial_dist; | QUERY_STATUS_DONE + | QUERY_STATUS_SUBMIT + | QUERY_STATUS_START + | QUERY_STATUS_END + | QUERY_STATUS_DONE + | QUERY_STATUS_SUBMIT + | QUERY_STATUS_START + | QUERY_STATUS_END + | QUERY_STATUS_DONE +(12 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +DROP FUNCTION yagp_status_order(text); +DROP EXTENSION yagp_hooks_collector; +RESET yagpcc.enable; +RESET yagpcc.report_nested_queries; +RESET yagpcc.enable_utility; diff --git a/expected/yagp_select.out b/expected/yagp_select.out new file mode 100644 index 00000000000..4c4a0218150 --- /dev/null +++ b/expected/yagp_select.out @@ -0,0 +1,138 @@ +CREATE EXTENSION yagp_hooks_collector; +CREATE OR REPLACE FUNCTION yagp_status_order(status text) +RETURNS integer +AS $$ +BEGIN + RETURN CASE status + WHEN 'QUERY_STATUS_SUBMIT' THEN 1 + WHEN 'QUERY_STATUS_START' THEN 2 + WHEN 'QUERY_STATUS_END' THEN 3 + WHEN 'QUERY_STATUS_DONE' THEN 4 + ELSE 999 + END; +END; +$$ LANGUAGE plpgsql IMMUTABLE; +SET yagpcc.enable TO TRUE; +SET yagpcc.report_nested_queries TO TRUE; +SET yagpcc.enable_utility TO FALSE; +-- Basic SELECT tests +SET yagpcc.logging_mode to 'TBL'; +SELECT 1; + ?column? +---------- + 1 +(1 row) + +SELECT COUNT(*) FROM generate_series(1,10); + count +------- + 10 +(1 row) + +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+---------------------------------------------+--------------------- + -1 | SELECT 1; | QUERY_STATUS_SUBMIT + -1 | SELECT 1; | QUERY_STATUS_START + -1 | SELECT 1; | QUERY_STATUS_END + -1 | SELECT 1; | QUERY_STATUS_DONE + -1 | SELECT COUNT(*) FROM generate_series(1,10); | QUERY_STATUS_SUBMIT + -1 | SELECT COUNT(*) FROM generate_series(1,10); | QUERY_STATUS_START + -1 | SELECT COUNT(*) FROM generate_series(1,10); | QUERY_STATUS_END + -1 | SELECT COUNT(*) FROM generate_series(1,10); | QUERY_STATUS_DONE +(8 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- Transaction test +SET yagpcc.logging_mode to 'TBL'; +BEGIN; +SELECT 1; + ?column? +---------- + 1 +(1 row) + +COMMIT; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+------------+--------------------- + -1 | SELECT 1; | QUERY_STATUS_SUBMIT + -1 | SELECT 1; | QUERY_STATUS_START + -1 | SELECT 1; | QUERY_STATUS_END + -1 | SELECT 1; | QUERY_STATUS_DONE +(4 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- CTE test +SET yagpcc.logging_mode to 'TBL'; +WITH t AS (VALUES (1), (2)) +SELECT * FROM t; + column1 +--------- + 1 + 2 +(2 rows) + +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+-----------------------------+--------------------- + -1 | WITH t AS (VALUES (1), (2))+| QUERY_STATUS_SUBMIT + | SELECT * FROM t; | + -1 | WITH t AS (VALUES (1), (2))+| QUERY_STATUS_START + | SELECT * FROM t; | + -1 | WITH t AS (VALUES (1), (2))+| QUERY_STATUS_END + | SELECT * FROM t; | + -1 | WITH t AS (VALUES (1), (2))+| QUERY_STATUS_DONE + | SELECT * FROM t; | +(4 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- Prepared statement test +SET yagpcc.logging_mode to 'TBL'; +PREPARE test_stmt AS SELECT 1; +EXECUTE test_stmt; + ?column? +---------- + 1 +(1 row) + +DEALLOCATE test_stmt; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+--------------------------------+--------------------- + -1 | PREPARE test_stmt AS SELECT 1; | QUERY_STATUS_SUBMIT + -1 | PREPARE test_stmt AS SELECT 1; | QUERY_STATUS_START + -1 | PREPARE test_stmt AS SELECT 1; | QUERY_STATUS_END + -1 | PREPARE test_stmt AS SELECT 1; | QUERY_STATUS_DONE +(4 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +DROP FUNCTION yagp_status_order(text); +DROP EXTENSION yagp_hooks_collector; +RESET yagpcc.enable; +RESET yagpcc.report_nested_queries; +RESET yagpcc.enable_utility; diff --git a/expected/yagp_utf8_trim.out b/expected/yagp_utf8_trim.out new file mode 100644 index 00000000000..194ee6b3609 --- /dev/null +++ b/expected/yagp_utf8_trim.out @@ -0,0 +1,66 @@ +CREATE EXTENSION IF NOT EXISTS yagp_hooks_collector; +CREATE OR REPLACE FUNCTION get_marked_query(marker TEXT) +RETURNS TEXT AS $$ + SELECT query_text + FROM yagpcc.log + WHERE query_text LIKE '%' || marker || '%' + ORDER BY datetime DESC + LIMIT 1 +$$ LANGUAGE sql VOLATILE; +SET yagpcc.enable TO TRUE; +-- Test 1: 1 byte chars +SET yagpcc.max_text_size to 19; +SET yagpcc.logging_mode to 'TBL'; +SELECT /*test1*/ 'HelloWorld'; + ?column? +------------ + HelloWorld +(1 row) + +RESET yagpcc.logging_mode; +SELECT octet_length(get_marked_query('test1')) = 19 AS correct_length; + correct_length +---------------- + t +(1 row) + +-- Test 2: 2 byte chars +SET yagpcc.max_text_size to 19; +SET yagpcc.logging_mode to 'TBL'; +SELECT /*test2*/ 'РУССКИЙЯЗЫК'; + ?column? +------------- + РУССКИЙЯЗЫК +(1 row) + +RESET yagpcc.logging_mode; +-- Character 'Р' has two bytes and cut in the middle => not included. +SELECT octet_length(get_marked_query('test2')) = 18 AS correct_length; + correct_length +---------------- + t +(1 row) + +-- Test 3: 4 byte chars +SET yagpcc.max_text_size to 21; +SET yagpcc.logging_mode to 'TBL'; +SELECT /*test3*/ '😀'; + ?column? +---------- + 😀 +(1 row) + +RESET yagpcc.logging_mode; +-- Emoji has 4 bytes and cut before the last byte => not included. +SELECT octet_length(get_marked_query('test3')) = 18 AS correct_length; + correct_length +---------------- + t +(1 row) + +-- Cleanup +DROP FUNCTION get_marked_query(TEXT); +RESET yagpcc.max_text_size; +RESET yagpcc.logging_mode; +RESET yagpcc.enable; +DROP EXTENSION yagp_hooks_collector; diff --git a/expected/yagp_utility.out b/expected/yagp_utility.out new file mode 100644 index 00000000000..03c17713575 --- /dev/null +++ b/expected/yagp_utility.out @@ -0,0 +1,272 @@ +CREATE EXTENSION yagp_hooks_collector; +CREATE OR REPLACE FUNCTION yagp_status_order(status text) +RETURNS integer +AS $$ +BEGIN + RETURN CASE status + WHEN 'QUERY_STATUS_SUBMIT' THEN 1 + WHEN 'QUERY_STATUS_START' THEN 2 + WHEN 'QUERY_STATUS_END' THEN 3 + WHEN 'QUERY_STATUS_DONE' THEN 4 + ELSE 999 + END; +END; +$$ LANGUAGE plpgsql IMMUTABLE; +SET yagpcc.enable TO TRUE; +SET yagpcc.enable_utility TO TRUE; +SET yagpcc.report_nested_queries TO TRUE; +SET yagpcc.logging_mode to 'TBL'; +CREATE TABLE test_table (a int, b text); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +CREATE INDEX test_idx ON test_table(a); +ALTER TABLE test_table ADD COLUMN c int DEFAULT 1; +DROP TABLE test_table; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+----------------------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | CREATE TABLE test_table (a int, b text); | QUERY_STATUS_SUBMIT + -1 | CREATE TABLE test_table (a int, b text); | QUERY_STATUS_DONE + -1 | CREATE INDEX test_idx ON test_table(a); | QUERY_STATUS_SUBMIT + -1 | CREATE INDEX test_idx ON test_table(a); | QUERY_STATUS_DONE + -1 | ALTER TABLE test_table ADD COLUMN c int DEFAULT 1; | QUERY_STATUS_SUBMIT + -1 | ALTER TABLE test_table ADD COLUMN c int DEFAULT 1; | QUERY_STATUS_DONE + -1 | DROP TABLE test_table; | QUERY_STATUS_SUBMIT + -1 | DROP TABLE test_table; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(10 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- Partitioning +SET yagpcc.logging_mode to 'TBL'; +CREATE TABLE pt_test (a int, b int) +DISTRIBUTED BY (a) +PARTITION BY RANGE (a) +(START (0) END (100) EVERY (50)); +NOTICE: CREATE TABLE will create partition "pt_test_1_prt_1" for table "pt_test" +NOTICE: CREATE TABLE will create partition "pt_test_1_prt_2" for table "pt_test" +DROP TABLE pt_test; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+-------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | CREATE TABLE pt_test (a int, b int)+| QUERY_STATUS_SUBMIT + | DISTRIBUTED BY (a) +| + | PARTITION BY RANGE (a) +| + | (START (0) END (100) EVERY (50)); | + -1 | CREATE TABLE pt_test (a int, b int)+| QUERY_STATUS_SUBMIT + | DISTRIBUTED BY (a) +| + | PARTITION BY RANGE (a) +| + | (START (0) END (100) EVERY (50)); | + -1 | CREATE TABLE pt_test (a int, b int)+| QUERY_STATUS_SUBMIT + | DISTRIBUTED BY (a) +| + | PARTITION BY RANGE (a) +| + | (START (0) END (100) EVERY (50)); | + -1 | CREATE TABLE pt_test (a int, b int)+| QUERY_STATUS_DONE + | DISTRIBUTED BY (a) +| + | PARTITION BY RANGE (a) +| + | (START (0) END (100) EVERY (50)); | + -1 | CREATE TABLE pt_test (a int, b int)+| QUERY_STATUS_DONE + | DISTRIBUTED BY (a) +| + | PARTITION BY RANGE (a) +| + | (START (0) END (100) EVERY (50)); | + -1 | CREATE TABLE pt_test (a int, b int)+| QUERY_STATUS_DONE + | DISTRIBUTED BY (a) +| + | PARTITION BY RANGE (a) +| + | (START (0) END (100) EVERY (50)); | + -1 | DROP TABLE pt_test; | QUERY_STATUS_SUBMIT + -1 | DROP TABLE pt_test; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(10 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- Views and Functions +SET yagpcc.logging_mode to 'TBL'; +CREATE VIEW test_view AS SELECT 1 AS a; +CREATE FUNCTION test_func(i int) RETURNS int AS $$ SELECT $1 + 1; $$ LANGUAGE SQL; +DROP VIEW test_view; +DROP FUNCTION test_func(int); +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+------------------------------------------------------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | CREATE VIEW test_view AS SELECT 1 AS a; | QUERY_STATUS_SUBMIT + -1 | CREATE VIEW test_view AS SELECT 1 AS a; | QUERY_STATUS_DONE + -1 | CREATE FUNCTION test_func(i int) RETURNS int AS $$ SELECT $1 + 1; $$ LANGUAGE SQL; | QUERY_STATUS_SUBMIT + -1 | CREATE FUNCTION test_func(i int) RETURNS int AS $$ SELECT $1 + 1; $$ LANGUAGE SQL; | QUERY_STATUS_DONE + -1 | DROP VIEW test_view; | QUERY_STATUS_SUBMIT + -1 | DROP VIEW test_view; | QUERY_STATUS_DONE + -1 | DROP FUNCTION test_func(int); | QUERY_STATUS_SUBMIT + -1 | DROP FUNCTION test_func(int); | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(10 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- Transaction Operations +SET yagpcc.logging_mode to 'TBL'; +BEGIN; +SAVEPOINT sp1; +ROLLBACK TO sp1; +COMMIT; +BEGIN; +SAVEPOINT sp2; +ABORT; +BEGIN; +ROLLBACK; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+----------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | BEGIN; | QUERY_STATUS_SUBMIT + -1 | BEGIN; | QUERY_STATUS_DONE + -1 | SAVEPOINT sp1; | QUERY_STATUS_SUBMIT + -1 | ROLLBACK TO sp1; | QUERY_STATUS_SUBMIT + -1 | ROLLBACK TO sp1; | QUERY_STATUS_DONE + -1 | COMMIT; | QUERY_STATUS_SUBMIT + -1 | COMMIT; | QUERY_STATUS_DONE + -1 | BEGIN; | QUERY_STATUS_SUBMIT + -1 | BEGIN; | QUERY_STATUS_DONE + -1 | SAVEPOINT sp2; | QUERY_STATUS_SUBMIT + -1 | ABORT; | QUERY_STATUS_SUBMIT + -1 | ABORT; | QUERY_STATUS_DONE + -1 | BEGIN; | QUERY_STATUS_SUBMIT + -1 | BEGIN; | QUERY_STATUS_DONE + -1 | ROLLBACK; | QUERY_STATUS_SUBMIT + -1 | ROLLBACK; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(18 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- DML Operations +SET yagpcc.logging_mode to 'TBL'; +CREATE TABLE dml_test (a int, b text); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +INSERT INTO dml_test VALUES (1, 'test'); +UPDATE dml_test SET b = 'updated' WHERE a = 1; +DELETE FROM dml_test WHERE a = 1; +DROP TABLE dml_test; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+----------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | CREATE TABLE dml_test (a int, b text); | QUERY_STATUS_SUBMIT + -1 | CREATE TABLE dml_test (a int, b text); | QUERY_STATUS_DONE + -1 | DROP TABLE dml_test; | QUERY_STATUS_SUBMIT + -1 | DROP TABLE dml_test; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(6 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- COPY Operations +SET yagpcc.logging_mode to 'TBL'; +CREATE TABLE copy_test (a int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +COPY (SELECT 1) TO STDOUT; +1 +DROP TABLE copy_test; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+---------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | CREATE TABLE copy_test (a int); | QUERY_STATUS_SUBMIT + -1 | CREATE TABLE copy_test (a int); | QUERY_STATUS_DONE + -1 | COPY (SELECT 1) TO STDOUT; | QUERY_STATUS_SUBMIT + -1 | COPY (SELECT 1) TO STDOUT; | QUERY_STATUS_DONE + -1 | DROP TABLE copy_test; | QUERY_STATUS_SUBMIT + -1 | DROP TABLE copy_test; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(8 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- Prepared Statements and error during execute +SET yagpcc.logging_mode to 'TBL'; +PREPARE test_prep(int) AS SELECT $1/0 AS value; +EXECUTE test_prep(0::int); +ERROR: division by zero +DEALLOCATE test_prep; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+-------------------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | PREPARE test_prep(int) AS SELECT $1/0 AS value; | QUERY_STATUS_SUBMIT + -1 | PREPARE test_prep(int) AS SELECT $1/0 AS value; | QUERY_STATUS_DONE + -1 | EXECUTE test_prep(0::int); | QUERY_STATUS_SUBMIT + -1 | EXECUTE test_prep(0::int); | QUERY_STATUS_ERROR + -1 | DEALLOCATE test_prep; | QUERY_STATUS_SUBMIT + -1 | DEALLOCATE test_prep; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(8 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +-- GUC Settings +SET yagpcc.logging_mode to 'TBL'; +SET yagpcc.report_nested_queries TO FALSE; +RESET yagpcc.report_nested_queries; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; + segid | query_text | query_status +-------+--------------------------------------------+--------------------- + -1 | | QUERY_STATUS_DONE + -1 | SET yagpcc.report_nested_queries TO FALSE; | QUERY_STATUS_SUBMIT + -1 | SET yagpcc.report_nested_queries TO FALSE; | QUERY_STATUS_DONE + -1 | RESET yagpcc.report_nested_queries; | QUERY_STATUS_SUBMIT + -1 | RESET yagpcc.report_nested_queries; | QUERY_STATUS_DONE + -1 | RESET yagpcc.logging_mode; | QUERY_STATUS_SUBMIT +(6 rows) + +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + t +--- + t +(1 row) + +DROP FUNCTION yagp_status_order(text); +DROP EXTENSION yagp_hooks_collector; +RESET yagpcc.enable; +RESET yagpcc.report_nested_queries; +RESET yagpcc.enable_utility; diff --git a/metric.md b/metric.md index 2d198391a67..5df56877edb 100644 --- a/metric.md +++ b/metric.md @@ -1,32 +1,33 @@ ## YAGP Hooks Collector Metrics -### States -A Postgres process goes through 4 executor functions to execute a query: -1) `ExecutorStart()` - resource allocation for the query. -2) `ExecutorRun()` - query execution. -3) `ExecutorFinish()` - cleanup. -4) `ExecutorEnd()` - cleanup. +### States +A Postgres process goes through 4 executor functions to execute a query: +1) `ExecutorStart()` - resource allocation for the query. +2) `ExecutorRun()` - query execution. +3) `ExecutorFinish()` - cleanup. +4) `ExecutorEnd()` - cleanup. -yagp-hooks-collector sends messages with 4 states, from _Dispatcher_ and/or _Execute_ processes: `submit`, `start`, `end`, `done`, in this order: +yagp-hooks-collector sends messages with 4 states, from _Dispatcher_ and/or _Execute_ processes: `submit`, `start`, `end`, `done`, in this order: ``` submit -> ExecutorStart() -> start -> ExecutorRun() -> ExecutorFinish() -> end -> ExecutorEnd() -> done ``` -### Key Points -- Some queries may skip the _end_ state, then the _end_ statistics is sent during _done_. -- If a query finishes with an error (`METRICS_QUERY_ERROR`), or is cancelled (`METRICS_QUERY_CANCELLED`), statistics is sent at _done_. -- Some statistics is calculated as the difference between the current global metric and the previous. The initial snapshot is taken at submit, and at _end_/_done_ the diff is calculated. -- Nested queries on _Dispatcher_ become top-level on _Execute_. -- Each process (_Dispatcher_/_Execute_) sends its own statistics. +### Key Points +- Some queries may skip the _end_ state, then the _end_ statistics is sent during _done_. +- If a query finishes with an error (`METRICS_QUERY_ERROR`), or is cancelled (`METRICS_QUERY_CANCELLED`), statistics is sent at _done_. +- Some statistics is calculated as the difference between the current global metric and the previous. The initial snapshot is taken at submit, and at _end_/_done_ the diff is calculated. +- Nested queries on _Dispatcher_ become top-level on _Execute_. +- Each process (_Dispatcher_/_Execute_) sends its own statistics -### Notations -- **S** = Submit event. -- **T** = Start event. -- **E** = End event. -- **D** = Done event. -- **DIFF** = current_value - submit_value (submit event). -- **ABS** = Absolute value, or where diff is not applicable, the value taken. -- **Local*** - Statistics that starts counting from zero for each new query. A nested query is also considered new. +### Notations +- **S** = Submit event. +- **T** = Start event. +- **E** = End event. +- **D** = Done event. +- **DIFF** = current_value - submit_value (submit event). +- **ABS** = Absolute value, or where diff is not applicable, the value taken. +- **Local*** - Statistics that starts counting from zero for each new query. A nested query is also considered new. +- **Node** - PG process, either a `Query Dispatcher` (on master) or an `Execute` (on segment). ### Statistics Table @@ -36,7 +37,7 @@ submit -> ExecutorStart() -> start -> ExecutorRun() -> ExecutorFinish() -> end - | `runningTimeSeconds` | double | E, D | DIFF | - | Node | + | + | seconds | Wall clock time | | `userTimeSeconds` | double | E, D | DIFF | - | Node | + | + | seconds | /proc/pid/stat utime | | `kernelTimeSeconds` | double | E, D | DIFF | - | Node | + | + | seconds | /proc/pid/stat stime | -| `vsize` | uint64 | E, D | ABS | - | Node | + | + | pages | /proc/pid/stat vsize | +| `vsize` | uint64 | E, D | ABS | - | Node | + | + | bytes | /proc/pid/stat vsize | | `rss` | uint64 | E, D | ABS | - | Node | + | + | pages | /proc/pid/stat rss | | `VmSizeKb` | uint64 | E, D | ABS | - | Node | + | + | KB | /proc/pid/status VmSize | | `VmPeakKb` | uint64 | E, D | ABS | - | Node | + | + | KB | /proc/pid/status VmPeak | @@ -108,13 +109,13 @@ submit -> ExecutorStart() -> start -> ExecutorRun() -> ExecutorFinish() -> end - | `userName` | string | All | ABS | - | Cluster | + | - | text | Session user | | `databaseName` | string | All | ABS | - | Cluster | + | - | text | Database name | | `rsgname` | string | All | ABS | - | Cluster | + | - | text | Resource group name | -| `analyze_text` | string | D | ABS | - | Cluster | + | - | text | EXPLAIN ANALYZE JSON | +| `analyze_text` | string | D | ABS | - | Cluster | + | - | text | EXPLAIN ANALYZE | | **AdditionalQueryInfo** | | | | | | | | | | | `nested_level` | int64 | All | ABS | - | Node | + | + | count | Current nesting level | | `error_message` | string | D | ABS | - | Node | + | + | text | Error message | | `slice_id` | int64 | All | ABS | - | Node | + | + | id | Slice ID | | **QueryKey** | | | | | | | | | | -| `tmid` | int32 | All | ABS | - | Node | + | + | id | Time ID | +| `tmid` | int32 | All | ABS | - | Node | + | + | id | Transaction start time | | `ssid` | int32 | All | ABS | - | Node | + | + | id | Session ID | | `ccnt` | int32 | All | ABS | - | Node | + | + | count | Command counter | | **SegmentKey** | | | | | | | | | | diff --git a/sql/yagp_cursors.sql b/sql/yagp_cursors.sql new file mode 100644 index 00000000000..5d5bde58110 --- /dev/null +++ b/sql/yagp_cursors.sql @@ -0,0 +1,83 @@ +CREATE EXTENSION yagp_hooks_collector; + +CREATE FUNCTION yagp_status_order(status text) +RETURNS integer +AS $$ +BEGIN + RETURN CASE status + WHEN 'QUERY_STATUS_SUBMIT' THEN 1 + WHEN 'QUERY_STATUS_START' THEN 2 + WHEN 'QUERY_STATUS_END' THEN 3 + WHEN 'QUERY_STATUS_DONE' THEN 4 + ELSE 999 + END; +END; +$$ LANGUAGE plpgsql IMMUTABLE; + +SET yagpcc.enable TO TRUE; +SET yagpcc.enable_utility TO TRUE; +SET yagpcc.report_nested_queries TO TRUE; + +-- DECLARE +SET yagpcc.logging_mode to 'TBL'; + +BEGIN; +DECLARE cursor_stats_0 CURSOR FOR SELECT 0; +CLOSE cursor_stats_0; +COMMIT; + +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- DECLARE WITH HOLD +SET yagpcc.logging_mode to 'TBL'; + +BEGIN; +DECLARE cursor_stats_1 CURSOR WITH HOLD FOR SELECT 1; +CLOSE cursor_stats_1; +DECLARE cursor_stats_2 CURSOR WITH HOLD FOR SELECT 2; +CLOSE cursor_stats_2; +COMMIT; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- ROLLBACK +SET yagpcc.logging_mode to 'TBL'; + +BEGIN; +DECLARE cursor_stats_3 CURSOR FOR SELECT 1; +CLOSE cursor_stats_3; +DECLARE cursor_stats_4 CURSOR FOR SELECT 1; +ROLLBACK; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- FETCH +SET yagpcc.logging_mode to 'TBL'; + +BEGIN; +DECLARE cursor_stats_5 CURSOR WITH HOLD FOR SELECT 2; +DECLARE cursor_stats_6 CURSOR WITH HOLD FOR SELECT 3; +FETCH 1 IN cursor_stats_5; +FETCH 1 IN cursor_stats_6; +CLOSE cursor_stats_5; +CLOSE cursor_stats_6; +COMMIT; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +DROP FUNCTION yagp_status_order(text); +DROP EXTENSION yagp_hooks_collector; +RESET yagpcc.enable; +RESET yagpcc.report_nested_queries; +RESET yagpcc.enable_utility; diff --git a/sql/yagp_dist.sql b/sql/yagp_dist.sql new file mode 100644 index 00000000000..b837ef05335 --- /dev/null +++ b/sql/yagp_dist.sql @@ -0,0 +1,86 @@ +CREATE EXTENSION yagp_hooks_collector; + +CREATE OR REPLACE FUNCTION yagp_status_order(status text) +RETURNS integer +AS $$ +BEGIN + RETURN CASE status + WHEN 'QUERY_STATUS_SUBMIT' THEN 1 + WHEN 'QUERY_STATUS_START' THEN 2 + WHEN 'QUERY_STATUS_END' THEN 3 + WHEN 'QUERY_STATUS_DONE' THEN 4 + ELSE 999 + END; +END; +$$ LANGUAGE plpgsql IMMUTABLE; + +SET yagpcc.enable TO TRUE; +SET yagpcc.report_nested_queries TO TRUE; +SET yagpcc.enable_utility TO FALSE; + +-- Hash distributed table + +CREATE TABLE test_hash_dist (id int) DISTRIBUTED BY (id); +INSERT INTO test_hash_dist SELECT 1; + +SET yagpcc.logging_mode to 'TBL'; +SET optimizer_enable_direct_dispatch TO TRUE; +-- Direct dispatch is used here, only one segment is scanned. +select * from test_hash_dist where id = 1; +RESET optimizer_enable_direct_dispatch; + +RESET yagpcc.logging_mode; +-- Should see 8 rows. +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +SET yagpcc.logging_mode to 'TBL'; + +-- Scan all segments. +select * from test_hash_dist; + +DROP TABLE test_hash_dist; +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- Replicated table +CREATE FUNCTION force_segments() RETURNS SETOF text AS $$ +BEGIN + RETURN NEXT 'seg'; +END; +$$ LANGUAGE plpgsql VOLATILE EXECUTE ON ALL SEGMENTS; + +CREATE TABLE test_replicated (id int) DISTRIBUTED REPLICATED; +INSERT INTO test_replicated SELECT 1; + +SET yagpcc.logging_mode to 'TBL'; +SELECT COUNT(*) FROM test_replicated, force_segments(); +DROP TABLE test_replicated; +DROP FUNCTION force_segments(); + +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- Partially distributed table (2 numsegments) +SET allow_system_table_mods = ON; +CREATE TABLE test_partial_dist (id int, data text) DISTRIBUTED BY (id); +UPDATE gp_distribution_policy SET numsegments = 2 WHERE localoid = 'test_partial_dist'::regclass; +INSERT INTO test_partial_dist SELECT * FROM generate_series(1, 100); + +SET yagpcc.logging_mode to 'TBL'; +SELECT COUNT(*) FROM test_partial_dist; +RESET yagpcc.logging_mode; + +DROP TABLE test_partial_dist; +RESET allow_system_table_mods; +-- Should see 12 rows. +SELECT query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +DROP FUNCTION yagp_status_order(text); +DROP EXTENSION yagp_hooks_collector; +RESET yagpcc.enable; +RESET yagpcc.report_nested_queries; +RESET yagpcc.enable_utility; diff --git a/sql/yagp_select.sql b/sql/yagp_select.sql new file mode 100644 index 00000000000..4038c6b7b63 --- /dev/null +++ b/sql/yagp_select.sql @@ -0,0 +1,67 @@ +CREATE EXTENSION yagp_hooks_collector; + +CREATE OR REPLACE FUNCTION yagp_status_order(status text) +RETURNS integer +AS $$ +BEGIN + RETURN CASE status + WHEN 'QUERY_STATUS_SUBMIT' THEN 1 + WHEN 'QUERY_STATUS_START' THEN 2 + WHEN 'QUERY_STATUS_END' THEN 3 + WHEN 'QUERY_STATUS_DONE' THEN 4 + ELSE 999 + END; +END; +$$ LANGUAGE plpgsql IMMUTABLE; + +SET yagpcc.enable TO TRUE; +SET yagpcc.report_nested_queries TO TRUE; +SET yagpcc.enable_utility TO FALSE; + +-- Basic SELECT tests +SET yagpcc.logging_mode to 'TBL'; + +SELECT 1; +SELECT COUNT(*) FROM generate_series(1,10); + +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- Transaction test +SET yagpcc.logging_mode to 'TBL'; + +BEGIN; +SELECT 1; +COMMIT; + +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- CTE test +SET yagpcc.logging_mode to 'TBL'; + +WITH t AS (VALUES (1), (2)) +SELECT * FROM t; + +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- Prepared statement test +SET yagpcc.logging_mode to 'TBL'; + +PREPARE test_stmt AS SELECT 1; +EXECUTE test_stmt; +DEALLOCATE test_stmt; + +RESET yagpcc.logging_mode; +SELECT segid, query_text, query_status FROM yagpcc.log ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +DROP FUNCTION yagp_status_order(text); +DROP EXTENSION yagp_hooks_collector; +RESET yagpcc.enable; +RESET yagpcc.report_nested_queries; +RESET yagpcc.enable_utility; diff --git a/sql/yagp_utf8_trim.sql b/sql/yagp_utf8_trim.sql new file mode 100644 index 00000000000..c0fdcce24a5 --- /dev/null +++ b/sql/yagp_utf8_trim.sql @@ -0,0 +1,43 @@ +CREATE EXTENSION IF NOT EXISTS yagp_hooks_collector; + +CREATE OR REPLACE FUNCTION get_marked_query(marker TEXT) +RETURNS TEXT AS $$ + SELECT query_text + FROM yagpcc.log + WHERE query_text LIKE '%' || marker || '%' + ORDER BY datetime DESC + LIMIT 1 +$$ LANGUAGE sql VOLATILE; + +SET yagpcc.enable TO TRUE; + +-- Test 1: 1 byte chars +SET yagpcc.max_text_size to 19; +SET yagpcc.logging_mode to 'TBL'; +SELECT /*test1*/ 'HelloWorld'; +RESET yagpcc.logging_mode; +SELECT octet_length(get_marked_query('test1')) = 19 AS correct_length; + +-- Test 2: 2 byte chars +SET yagpcc.max_text_size to 19; +SET yagpcc.logging_mode to 'TBL'; +SELECT /*test2*/ 'РУССКИЙЯЗЫК'; +RESET yagpcc.logging_mode; +-- Character 'Р' has two bytes and cut in the middle => not included. +SELECT octet_length(get_marked_query('test2')) = 18 AS correct_length; + +-- Test 3: 4 byte chars +SET yagpcc.max_text_size to 21; +SET yagpcc.logging_mode to 'TBL'; +SELECT /*test3*/ '😀'; +RESET yagpcc.logging_mode; +-- Emoji has 4 bytes and cut before the last byte => not included. +SELECT octet_length(get_marked_query('test3')) = 18 AS correct_length; + +-- Cleanup +DROP FUNCTION get_marked_query(TEXT); +RESET yagpcc.max_text_size; +RESET yagpcc.logging_mode; +RESET yagpcc.enable; + +DROP EXTENSION yagp_hooks_collector; diff --git a/sql/yagp_utility.sql b/sql/yagp_utility.sql new file mode 100644 index 00000000000..b4cca6f5421 --- /dev/null +++ b/sql/yagp_utility.sql @@ -0,0 +1,133 @@ +CREATE EXTENSION yagp_hooks_collector; + +CREATE OR REPLACE FUNCTION yagp_status_order(status text) +RETURNS integer +AS $$ +BEGIN + RETURN CASE status + WHEN 'QUERY_STATUS_SUBMIT' THEN 1 + WHEN 'QUERY_STATUS_START' THEN 2 + WHEN 'QUERY_STATUS_END' THEN 3 + WHEN 'QUERY_STATUS_DONE' THEN 4 + ELSE 999 + END; +END; +$$ LANGUAGE plpgsql IMMUTABLE; + +SET yagpcc.enable TO TRUE; +SET yagpcc.enable_utility TO TRUE; +SET yagpcc.report_nested_queries TO TRUE; + +SET yagpcc.logging_mode to 'TBL'; + +CREATE TABLE test_table (a int, b text); +CREATE INDEX test_idx ON test_table(a); +ALTER TABLE test_table ADD COLUMN c int DEFAULT 1; +DROP TABLE test_table; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- Partitioning +SET yagpcc.logging_mode to 'TBL'; + +CREATE TABLE pt_test (a int, b int) +DISTRIBUTED BY (a) +PARTITION BY RANGE (a) +(START (0) END (100) EVERY (50)); +DROP TABLE pt_test; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- Views and Functions +SET yagpcc.logging_mode to 'TBL'; + +CREATE VIEW test_view AS SELECT 1 AS a; +CREATE FUNCTION test_func(i int) RETURNS int AS $$ SELECT $1 + 1; $$ LANGUAGE SQL; +DROP VIEW test_view; +DROP FUNCTION test_func(int); + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- Transaction Operations +SET yagpcc.logging_mode to 'TBL'; + +BEGIN; +SAVEPOINT sp1; +ROLLBACK TO sp1; +COMMIT; + +BEGIN; +SAVEPOINT sp2; +ABORT; + +BEGIN; +ROLLBACK; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- DML Operations +SET yagpcc.logging_mode to 'TBL'; + +CREATE TABLE dml_test (a int, b text); +INSERT INTO dml_test VALUES (1, 'test'); +UPDATE dml_test SET b = 'updated' WHERE a = 1; +DELETE FROM dml_test WHERE a = 1; +DROP TABLE dml_test; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- COPY Operations +SET yagpcc.logging_mode to 'TBL'; + +CREATE TABLE copy_test (a int); +COPY (SELECT 1) TO STDOUT; +DROP TABLE copy_test; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- Prepared Statements and error during execute +SET yagpcc.logging_mode to 'TBL'; + +PREPARE test_prep(int) AS SELECT $1/0 AS value; +EXECUTE test_prep(0::int); +DEALLOCATE test_prep; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +-- GUC Settings +SET yagpcc.logging_mode to 'TBL'; + +SET yagpcc.report_nested_queries TO FALSE; +RESET yagpcc.report_nested_queries; + +RESET yagpcc.logging_mode; + +SELECT segid, query_text, query_status FROM yagpcc.log WHERE segid = -1 AND utility = true ORDER BY segid, ccnt, yagp_status_order(query_status) ASC; +SELECT yagpcc.truncate_log() IS NOT NULL AS t; + +DROP FUNCTION yagp_status_order(text); +DROP EXTENSION yagp_hooks_collector; +RESET yagpcc.enable; +RESET yagpcc.report_nested_queries; +RESET yagpcc.enable_utility; diff --git a/src/Config.cpp b/src/Config.cpp index aef09fc7d73..dbd7e25b483 100644 --- a/src/Config.cpp +++ b/src/Config.cpp @@ -16,9 +16,16 @@ static bool guc_enable_cdbstats = true; static bool guc_enable_collector = true; static bool guc_report_nested_queries = true; static char *guc_ignored_users = nullptr; -static int guc_max_text_size = 1024; // in KB -static int guc_max_plan_size = 1024; // in KB -static int guc_min_analyze_time = -1; // uninitialized state +static int guc_max_text_size = 1 << 20; // in bytes (1MB) +static int guc_max_plan_size = 1024; // in KB +static int guc_min_analyze_time = 10000; // in ms +static int guc_logging_mode = LOG_MODE_UDS; +static bool guc_enable_utility = false; + +static const struct config_enum_entry logging_mode_options[] = { + {"uds", LOG_MODE_UDS, false /* hidden */}, + {"tbl", LOG_MODE_TBL, false}, + {NULL, 0, false}}; static std::unique_ptr<std::unordered_set<std::string>> ignored_users_set = nullptr; @@ -92,9 +99,9 @@ void Config::init() { DefineCustomIntVariable( "yagpcc.max_text_size", - "Make yagpcc trim query texts longer than configured size", NULL, - &guc_max_text_size, 1024, 0, INT_MAX / 1024, PGC_SUSET, - GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC | GUC_UNIT_KB, NULL, NULL, NULL); + "Make yagpcc trim query texts longer than configured size in bytes", NULL, + &guc_max_text_size, 1 << 20 /* 1MB */, 0, INT_MAX, PGC_SUSET, + GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, NULL, NULL, NULL); DefineCustomIntVariable( "yagpcc.max_plan_size", @@ -106,18 +113,31 @@ void Config::init() { "yagpcc.min_analyze_time", "Sets the minimum execution time above which plans will be logged.", "Zero prints all plans. -1 turns this feature off.", - &guc_min_analyze_time, -1, -1, INT_MAX, PGC_USERSET, + &guc_min_analyze_time, 10000, -1, INT_MAX, PGC_USERSET, GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC | GUC_UNIT_MS, NULL, NULL, NULL); + + DefineCustomEnumVariable( + "yagpcc.logging_mode", "Logging mode: UDS or PG Table", NULL, + &guc_logging_mode, LOG_MODE_UDS, logging_mode_options, PGC_SUSET, + GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC | GUC_SUPERUSER_ONLY, NULL, NULL, + NULL); + + DefineCustomBoolVariable( + "yagpcc.enable_utility", "Collect utility statement stats", NULL, + &guc_enable_utility, false, PGC_USERSET, + GUC_NOT_IN_SAMPLE | GUC_GPDB_NEED_SYNC, NULL, NULL, NULL); } std::string Config::uds_path() { return guc_uds_path; } bool Config::enable_analyze() { return guc_enable_analyze; } bool Config::enable_cdbstats() { return guc_enable_cdbstats; } bool Config::enable_collector() { return guc_enable_collector; } +bool Config::enable_utility() { return guc_enable_utility; } bool Config::report_nested_queries() { return guc_report_nested_queries; } -size_t Config::max_text_size() { return guc_max_text_size * 1024; } +size_t Config::max_text_size() { return guc_max_text_size; } size_t Config::max_plan_size() { return guc_max_plan_size * 1024; } int Config::min_analyze_time() { return guc_min_analyze_time; }; +int Config::logging_mode() { return guc_logging_mode; } bool Config::filter_user(std::string username) { if (!ignored_users_set) { diff --git a/src/Config.h b/src/Config.h index eff83f0960a..7501c727a44 100644 --- a/src/Config.h +++ b/src/Config.h @@ -2,6 +2,9 @@ #include <string> +#define LOG_MODE_UDS 0 +#define LOG_MODE_TBL 1 + class Config { public: static void init(); @@ -9,10 +12,12 @@ public: static bool enable_analyze(); static bool enable_cdbstats(); static bool enable_collector(); + static bool enable_utility(); static bool filter_user(std::string username); static bool report_nested_queries(); static size_t max_text_size(); static size_t max_plan_size(); static int min_analyze_time(); + static int logging_mode(); static void sync(); }; \ No newline at end of file diff --git a/src/EventSender.cpp b/src/EventSender.cpp index 133d409b574..fee435a6dcc 100644 --- a/src/EventSender.cpp +++ b/src/EventSender.cpp @@ -1,6 +1,7 @@ #include "Config.h" #include "UDSConnector.h" #include "memory/gpdbwrappers.h" +#include "log/LogOps.h" #define typeid __typeid extern "C" { @@ -24,10 +25,82 @@ extern "C" { (Gp_role == GP_ROLE_DISPATCH && Config::min_analyze_time() >= 0 && \ Config::enable_analyze()) -void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { +static bool enable_utility = Config::enable_utility(); + +bool EventSender::verify_query(QueryDesc *query_desc, QueryState state, + bool utility) { + if (!proto_verified) { + return false; + } if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) { - return; + return false; + } + + switch (state) { + case QueryState::SUBMIT: + // Cache enable_utility at SUBMIT to ensure consistent behavior at DONE. + // Without caching, a query that sets enable_utility to false from true + // would be accepted at SUBMIT (guc is true) but rejected at DONE (guc + // is false), causing a leak. + enable_utility = Config::enable_utility(); + if (utility && enable_utility == false) { + return false; + } + // Sync config in case current query changes it. + Config::sync(); + // Register qkey for a nested query we won't report, + // so we can detect nesting_level > 0 and skip reporting at end/done. + if (!need_report_nested_query() && nesting_level > 0) { + QueryKey::register_qkey(query_desc, nesting_level); + return false; + } + if (is_top_level_query(query_desc, nesting_level)) { + nested_timing = 0; + nested_calls = 0; + } + break; + case QueryState::START: + if (!qdesc_submitted(query_desc)) { + collect_query_submit(query_desc, false /* utility */); + } + break; + case QueryState::DONE: + if (utility && enable_utility == false) { + return false; + } + default: + break; + } + + if (filter_query(query_desc)) { + return false; + } + if (!nesting_is_valid(query_desc, nesting_level)) { + return false; + } + + return true; +} + +bool EventSender::log_query_req(const yagpcc::SetQueryReq &req, + const std::string &event, bool utility) { + bool clear_big_fields = false; + switch (Config::logging_mode()) { + case LOG_MODE_UDS: + clear_big_fields = UDSConnector::report_query(req, event); + break; + case LOG_MODE_TBL: + ya_gpdb::insert_log(req, utility); + clear_big_fields = false; + break; + default: + Assert(false); } + return clear_big_fields; +} + +void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg, + bool utility, ErrorData *edata) { auto *query_desc = reinterpret_cast<QueryDesc *>(arg); switch (status) { case METRICS_PLAN_NODE_INITIALIZE: @@ -36,7 +109,7 @@ void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { // TODO break; case METRICS_QUERY_SUBMIT: - collect_query_submit(query_desc); + collect_query_submit(query_desc, utility); break; case METRICS_QUERY_START: // no-op: executor_after_start is enough @@ -50,7 +123,7 @@ void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { case METRICS_QUERY_ERROR: case METRICS_QUERY_CANCELED: case METRICS_INNER_QUERY_DONE: - collect_query_done(query_desc, status); + collect_query_done(query_desc, utility, status, edata); break; default: ereport(FATAL, (errmsg("Unknown query status: %d", status))); @@ -58,18 +131,10 @@ void EventSender::query_metrics_collect(QueryMetricsStatus status, void *arg) { } void EventSender::executor_before_start(QueryDesc *query_desc, int eflags) { - if (!connector) { - return; - } - if (filter_query(query_desc)) { - return; - } - if (!qdesc_submitted(query_desc)) { - collect_query_submit(query_desc); - } - if (!need_collect(query_desc, nesting_level)) { + if (!verify_query(query_desc, QueryState::START, false /* utility*/)) { return; } + if (Gp_role == GP_ROLE_DISPATCH && Config::enable_analyze() && (eflags & EXEC_FLAG_EXPLAIN_ONLY) == 0) { query_desc->instrument_options |= INSTRUMENT_BUFFERS; @@ -88,16 +153,14 @@ void EventSender::executor_before_start(QueryDesc *query_desc, int eflags) { } void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) { - if (!connector || !need_collect(query_desc, nesting_level)) { - return; - } - if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) { + if (!verify_query(query_desc, QueryState::START, false /* utility */)) { return; } + auto &query = get_query(query_desc); auto query_msg = query.message.get(); *query_msg->mutable_start_time() = current_ts(); - update_query_state(query, QueryState::START); + update_query_state(query, QueryState::START, false /* utility */); set_query_plan(query_msg, query_desc); if (need_collect_analyze()) { // Set up to track total elapsed time during query run. @@ -112,52 +175,37 @@ void EventSender::executor_after_start(QueryDesc *query_desc, int /* eflags*/) { } yagpcc::GPMetrics stats; std::swap(stats, *query_msg->mutable_query_metrics()); - if (connector->report_query(*query_msg, "started")) { + if (log_query_req(*query_msg, "started", false /* utility */)) { clear_big_fields(query_msg); } std::swap(stats, *query_msg->mutable_query_metrics()); } void EventSender::executor_end(QueryDesc *query_desc) { - if (!connector || !need_collect(query_desc, nesting_level)) { - return; - } - if (Gp_role != GP_ROLE_DISPATCH && Gp_role != GP_ROLE_EXECUTE) { + if (!verify_query(query_desc, QueryState::END, false /* utility */)) { return; } + auto &query = get_query(query_desc); auto *query_msg = query.message.get(); *query_msg->mutable_end_time() = current_ts(); - update_query_state(query, QueryState::END); + update_query_state(query, QueryState::END, false /* utility */); if (is_top_level_query(query_desc, nesting_level)) { set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, nested_calls, nested_timing); } else { set_gp_metrics(query_msg->mutable_query_metrics(), query_desc, 0, 0); } - if (connector->report_query(*query_msg, "ended")) { + if (log_query_req(*query_msg, "ended", false /* utility */)) { clear_big_fields(query_msg); } } -void EventSender::collect_query_submit(QueryDesc *query_desc) { - if (!connector) { - return; - } - Config::sync(); - // Register qkey for a nested query we won't report, - // so we can detect nesting_level > 0 and skip reporting at end/done. - if (!need_report_nested_query() && nesting_level > 0) { - QueryKey::register_qkey(query_desc, nesting_level); - return; - } - if (is_top_level_query(query_desc, nesting_level)) { - nested_timing = 0; - nested_calls = 0; - } - if (!need_collect(query_desc, nesting_level)) { +void EventSender::collect_query_submit(QueryDesc *query_desc, bool utility) { + if (!verify_query(query_desc, QueryState::SUBMIT, utility)) { return; } + submit_query(query_desc); auto &query = get_query(query_desc); auto *query_msg = query.message.get(); @@ -167,7 +215,7 @@ void EventSender::collect_query_submit(QueryDesc *query_desc) { set_qi_nesting_level(query_msg, nesting_level); set_qi_slice_id(query_msg); set_query_text(query_msg, query_desc); - if (connector->report_query(*query_msg, "submit")) { + if (log_query_req(*query_msg, "submit", utility)) { clear_big_fields(query_msg); } // take initial metrics snapshot so that we can safely take diff afterwards @@ -182,7 +230,8 @@ void EventSender::collect_query_submit(QueryDesc *query_desc) { } void EventSender::report_query_done(QueryDesc *query_desc, QueryItem &query, - QueryMetricsStatus status) { + QueryMetricsStatus status, bool utility, + ErrorData *edata) { yagpcc::QueryStatus query_status; std::string msg; switch (status) { @@ -211,12 +260,20 @@ void EventSender::report_query_done(QueryDesc *query_desc, QueryItem &query, (errmsg("Unexpected query status in query_done hook: %d", status))); } auto prev_state = query.state; - update_query_state(query, QueryState::DONE, + update_query_state(query, QueryState::DONE, utility, query_status == yagpcc::QueryStatus::QUERY_STATUS_DONE); auto query_msg = query.message.get(); query_msg->set_query_status(query_status); if (status == METRICS_QUERY_ERROR) { - set_qi_error_message(query_msg); + bool error_flushed = elog_message() == NULL; + if (error_flushed && edata->message == NULL) { + ereport(WARNING, (errmsg("YAGPCC missing error message"))); + ereport(DEBUG3, + (errmsg("YAGPCC query sourceText: %s", query_desc->sourceText))); + } else { + set_qi_error_message(query_msg, + error_flushed ? edata->message : elog_message()); + } } if (prev_state == START) { // We've missed ExecutorEnd call due to query cancel or error. It's @@ -230,12 +287,13 @@ void EventSender::report_query_done(QueryDesc *query_desc, QueryItem &query, set_ic_stats(query_msg->mutable_query_metrics()->mutable_instrumentation(), &ic_statistics); #endif - connector->report_query(*query_msg, msg); + (void)log_query_req(*query_msg, msg, utility); } -void EventSender::collect_query_done(QueryDesc *query_desc, - QueryMetricsStatus status) { - if (!connector || !need_collect(query_desc, nesting_level)) { +void EventSender::collect_query_done(QueryDesc *query_desc, bool utility, + QueryMetricsStatus status, + ErrorData *edata) { + if (!verify_query(query_desc, QueryState::DONE, utility)) { return; } @@ -258,10 +316,7 @@ void EventSender::collect_query_done(QueryDesc *query_desc, } auto &query = get_query(query_desc); - bool report = need_report_nested_query() || - is_top_level_query(query_desc, nesting_level); - if (report) - report_query_done(query_desc, query, status); + report_query_done(query_desc, query, status, utility, edata); if (need_report_nested_query()) update_nested_counters(query_desc); @@ -276,7 +331,7 @@ void EventSender::ic_metrics_collect() { if (Gp_interconnect_type != INTERCONNECT_TYPE_UDPIFC) { return; } - if (!connector || gp_command_count == 0 || !Config::enable_collector() || + if (!proto_verified || gp_command_count == 0 || !Config::enable_collector() || Config::filter_user(get_user_name())) { return; } @@ -305,15 +360,12 @@ void EventSender::ic_metrics_collect() { } void EventSender::analyze_stats_collect(QueryDesc *query_desc) { - if (!connector || Gp_role != GP_ROLE_DISPATCH) { + if (!verify_query(query_desc, QueryState::END, false /* utility */)) { return; } - if (!need_collect(query_desc, nesting_level)) { + if (Gp_role != GP_ROLE_DISPATCH) { return; } - auto &query = get_query(query_desc); - auto *query_msg = query.message.get(); - *query_msg->mutable_end_time() = current_ts(); if (!query_desc->totaltime || !need_collect_analyze()) { return; } @@ -323,14 +375,17 @@ void EventSender::analyze_stats_collect(QueryDesc *query_desc) { double ms = query_desc->totaltime->total * 1000.0; if (ms >= Config::min_analyze_time()) { - set_analyze_plan_text_json(query_desc, query_msg); + auto &query = get_query(query_desc); + auto *query_msg = query.message.get(); + set_analyze_plan_text(query_desc, query_msg); } } EventSender::EventSender() { if (Config::enable_collector()) { try { - connector = new UDSConnector(); + GOOGLE_PROTOBUF_VERIFY_VERSION; + proto_verified = true; } catch (const std::exception &e) { ereport(INFO, (errmsg("Unable to start query tracing %s", e.what()))); } @@ -342,18 +397,16 @@ EventSender::EventSender() { EventSender::~EventSender() { for (const auto &[qkey, _] : queries) { - ereport(LOG, - (errmsg("YAGPCC query with missing done event: " - "tmid=%d ssid=%d ccnt=%d nlvl=%d", - qkey.tmid, qkey.ssid, qkey.ccnt, qkey.nesting_level))); + ereport(LOG, (errmsg("YAGPCC query with missing done event: " + "tmid=%d ssid=%d ccnt=%d nlvl=%d", + qkey.tmid, qkey.ssid, qkey.ccnt, qkey.nesting_level))); } - delete connector; } // That's basically a very simplistic state machine to fix or highlight any bugs // coming from GP void EventSender::update_query_state(QueryItem &query, QueryState new_state, - bool success) { + bool utility, bool success) { switch (new_state) { case QueryState::SUBMIT: Assert(false); @@ -372,7 +425,7 @@ void EventSender::update_query_state(QueryItem &query, QueryState new_state, query.message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_END); break; case QueryState::DONE: - Assert(query.state == QueryState::END || !success); + Assert(query.state == QueryState::END || !success || utility); query.message->set_query_status(yagpcc::QueryStatus::QUERY_STATUS_DONE); break; default: diff --git a/src/EventSender.h b/src/EventSender.h index 4071d580ff9..4afdf1e14a4 100644 --- a/src/EventSender.h +++ b/src/EventSender.h @@ -87,7 +87,8 @@ public: void executor_before_start(QueryDesc *query_desc, int eflags); void executor_after_start(QueryDesc *query_desc, int eflags); void executor_end(QueryDesc *query_desc); - void query_metrics_collect(QueryMetricsStatus status, void *arg); + void query_metrics_collect(QueryMetricsStatus status, void *arg, bool utility, + ErrorData *edata = NULL); void ic_metrics_collect(); void analyze_stats_collect(QueryDesc *query_desc); void incr_depth() { nesting_level++; } @@ -105,18 +106,23 @@ private: explicit QueryItem(QueryState st); }; - void update_query_state(QueryItem &query, QueryState new_state, + static bool log_query_req(const yagpcc::SetQueryReq &req, + const std::string &event, bool utility); + bool verify_query(QueryDesc *query_desc, QueryState state, bool utility); + void update_query_state(QueryItem &query, QueryState new_state, bool utility, bool success = true); QueryItem &get_query(QueryDesc *query_desc); void submit_query(QueryDesc *query_desc); - void collect_query_submit(QueryDesc *query_desc); + void collect_query_submit(QueryDesc *query_desc, bool utility); void report_query_done(QueryDesc *query_desc, QueryItem &query, - QueryMetricsStatus status); - void collect_query_done(QueryDesc *query_desc, QueryMetricsStatus status); + QueryMetricsStatus status, bool utility, + ErrorData *edata = NULL); + void collect_query_done(QueryDesc *query_desc, bool utility, + QueryMetricsStatus status, ErrorData *edata = NULL); void update_nested_counters(QueryDesc *query_desc); bool qdesc_submitted(QueryDesc *query_desc); - UDSConnector *connector = nullptr; + bool proto_verified = false; int nesting_level = 0; int64_t nested_calls = 0; double nested_timing = 0; diff --git a/src/PgUtils.cpp b/src/PgUtils.cpp index 929f0cf2681..fc58112bfaa 100644 --- a/src/PgUtils.cpp +++ b/src/PgUtils.cpp @@ -79,8 +79,3 @@ bool filter_query(QueryDesc *query_desc) { return gp_command_count == 0 || query_desc->sourceText == nullptr || !Config::enable_collector() || Config::filter_user(get_user_name()); } - -bool need_collect(QueryDesc *query_desc, int nesting_level) { - return !filter_query(query_desc) && - nesting_is_valid(query_desc, nesting_level); -} diff --git a/src/PgUtils.h b/src/PgUtils.h index ceb07c2e8e5..02f084c597a 100644 --- a/src/PgUtils.h +++ b/src/PgUtils.h @@ -12,6 +12,3 @@ bool is_top_level_query(QueryDesc *query_desc, int nesting_level); bool nesting_is_valid(QueryDesc *query_desc, int nesting_level); bool need_report_nested_query(); bool filter_query(QueryDesc *query_desc); -bool need_collect(QueryDesc *query_desc, int nesting_level); -ExplainState get_explain_state(QueryDesc *query_desc, bool costs); -ExplainState get_analyze_state_json(QueryDesc *query_desc, bool analyze); diff --git a/src/ProtoUtils.cpp b/src/ProtoUtils.cpp index 4655433c806..f28714da6ec 100644 --- a/src/ProtoUtils.cpp +++ b/src/ProtoUtils.cpp @@ -24,6 +24,18 @@ extern "C" { #include <ctime> #include <string> +namespace { +constexpr uint8_t UTF8_CONTINUATION_BYTE_MASK = (1 << 7) | (1 << 6); +constexpr uint8_t UTF8_CONTINUATION_BYTE = (1 << 7); +constexpr uint8_t UTF8_MAX_SYMBOL_BYTES = 4; + +// Returns true if byte is the starting byte of utf8 +// character, false if byte is the continuation (10xxxxxx). +inline bool utf8_start_byte(uint8_t byte) { + return (byte & UTF8_CONTINUATION_BYTE_MASK) != UTF8_CONTINUATION_BYTE; +} +} // namespace + google::protobuf::Timestamp current_ts() { google::protobuf::Timestamp current_ts; struct timeval tv; @@ -46,9 +58,26 @@ void set_segment_key(yagpcc::SegmentKey *key) { key->set_segindex(GpIdentity.segindex); } -inline std::string char_to_trimmed_str(const char *str, size_t len, - size_t lim) { - return std::string(str, std::min(len, lim)); +std::string trim_str_shrink_utf8(const char *str, size_t len, size_t lim) { + if (unlikely(str == nullptr)) { + return std::string(); + } + if (likely(len <= lim || GetDatabaseEncoding() != PG_UTF8)) { + return std::string(str, std::min(len, lim)); + } + + // Handle trimming of utf8 correctly, do not cut multi-byte characters. + size_t cut_pos = lim; + size_t visited_bytes = 1; + while (visited_bytes < UTF8_MAX_SYMBOL_BYTES && cut_pos > 0) { + if (utf8_start_byte(static_cast<uint8_t>(str[cut_pos]))) { + break; + } + ++visited_bytes; + --cut_pos; + } + + return std::string(str, cut_pos); } void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { @@ -61,10 +90,10 @@ void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { ya_gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt); ExplainState es = ya_gpdb::get_explain_state(query_desc, true); if (es.str) { - *qi->mutable_plan_text() = char_to_trimmed_str(es.str->data, es.str->len, - Config::max_plan_size()); + *qi->mutable_plan_text() = trim_str_shrink_utf8(es.str->data, es.str->len, + Config::max_plan_size()); StringInfo norm_plan = ya_gpdb::gen_normplan(es.str->data); - *qi->mutable_template_plan_text() = char_to_trimmed_str( + *qi->mutable_template_plan_text() = trim_str_shrink_utf8( norm_plan->data, norm_plan->len, Config::max_plan_size()); qi->set_plan_id( hash_any((unsigned char *)norm_plan->data, norm_plan->len)); @@ -79,11 +108,11 @@ void set_query_plan(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { void set_query_text(yagpcc::SetQueryReq *req, QueryDesc *query_desc) { if (Gp_session_role == GP_ROLE_DISPATCH && query_desc->sourceText) { auto qi = req->mutable_query_info(); - *qi->mutable_query_text() = char_to_trimmed_str( + *qi->mutable_query_text() = trim_str_shrink_utf8( query_desc->sourceText, strlen(query_desc->sourceText), Config::max_text_size()); char *norm_query = ya_gpdb::gen_normquery(query_desc->sourceText); - *qi->mutable_template_query_text() = char_to_trimmed_str( + *qi->mutable_template_query_text() = trim_str_shrink_utf8( norm_query, strlen(norm_query), Config::max_text_size()); } } @@ -103,7 +132,8 @@ void set_query_info(yagpcc::SetQueryReq *req) { if (Gp_session_role == GP_ROLE_DISPATCH) { auto qi = req->mutable_query_info(); qi->set_username(get_user_name()); - qi->set_databasename(get_db_name()); + if (IsTransactionState()) + qi->set_databasename(get_db_name()); qi->set_rsgname(get_rg_name()); } } @@ -118,11 +148,10 @@ void set_qi_slice_id(yagpcc::SetQueryReq *req) { aqi->set_slice_id(currentSliceId); } -void set_qi_error_message(yagpcc::SetQueryReq *req) { +void set_qi_error_message(yagpcc::SetQueryReq *req, const char *err_msg) { auto aqi = req->mutable_add_info(); - auto error = elog_message(); *aqi->mutable_error_message() = - char_to_trimmed_str(error, strlen(error), Config::max_text_size()); + trim_str_shrink_utf8(err_msg, strlen(err_msg), Config::max_text_size()); } void set_metric_instrumentation(yagpcc::MetricInstrumentation *metrics, @@ -226,8 +255,7 @@ double protots_to_double(const google::protobuf::Timestamp &ts) { return double(ts.seconds()) + double(ts.nanos()) / 1000000000.0; } -void set_analyze_plan_text_json(QueryDesc *query_desc, - yagpcc::SetQueryReq *req) { +void set_analyze_plan_text(QueryDesc *query_desc, yagpcc::SetQueryReq *req) { // Make sure it is a valid txn and it is not an utility // statement for ExplainPrintPlan() later. if (!IsTransactionState() || !query_desc->plannedstmt) { @@ -235,7 +263,7 @@ void set_analyze_plan_text_json(QueryDesc *query_desc, } MemoryContext oldcxt = ya_gpdb::mem_ctx_switch_to(query_desc->estate->es_query_cxt); - ExplainState es = ya_gpdb::get_analyze_state_json( + ExplainState es = ya_gpdb::get_analyze_state( query_desc, query_desc->instrument_options && Config::enable_analyze()); ya_gpdb::mem_ctx_switch_to(oldcxt); if (es.str) { @@ -243,14 +271,9 @@ void set_analyze_plan_text_json(QueryDesc *query_desc, if (es.str->len > 0 && es.str->data[es.str->len - 1] == '\n') { es.str->data[--es.str->len] = '\0'; } - // Convert JSON array to JSON object. - if (es.str->len > 0) { - es.str->data[0] = '{'; - es.str->data[es.str->len - 1] = '}'; - } - auto trimmed_analyze = - char_to_trimmed_str(es.str->data, es.str->len, Config::max_plan_size()); + auto trimmed_analyze = trim_str_shrink_utf8(es.str->data, es.str->len, + Config::max_plan_size()); req->mutable_query_info()->set_analyze_text(trimmed_analyze); ya_gpdb::pfree(es.str->data); } -} \ No newline at end of file +} diff --git a/src/ProtoUtils.h b/src/ProtoUtils.h index 8287b3de7ea..725a634f765 100644 --- a/src/ProtoUtils.h +++ b/src/ProtoUtils.h @@ -12,12 +12,11 @@ void clear_big_fields(yagpcc::SetQueryReq *req); void set_query_info(yagpcc::SetQueryReq *req); void set_qi_nesting_level(yagpcc::SetQueryReq *req, int nesting_level); void set_qi_slice_id(yagpcc::SetQueryReq *req); -void set_qi_error_message(yagpcc::SetQueryReq *req); +void set_qi_error_message(yagpcc::SetQueryReq *req, const char *err_msg); void set_gp_metrics(yagpcc::GPMetrics *metrics, QueryDesc *query_desc, int nested_calls, double nested_time); void set_ic_stats(yagpcc::MetricInstrumentation *metrics, const ICStatistics *ic_statistics); yagpcc::SetQueryReq create_query_req(yagpcc::QueryStatus status); double protots_to_double(const google::protobuf::Timestamp &ts); -void set_analyze_plan_text_json(QueryDesc *query_desc, - yagpcc::SetQueryReq *message); \ No newline at end of file +void set_analyze_plan_text(QueryDesc *query_desc, yagpcc::SetQueryReq *message); \ No newline at end of file diff --git a/src/UDSConnector.cpp b/src/UDSConnector.cpp index f8c4586126d..b6af303218d 100644 --- a/src/UDSConnector.cpp +++ b/src/UDSConnector.cpp @@ -2,6 +2,7 @@ #include "Config.h" #include "YagpStat.h" #include "memory/gpdbwrappers.h" +#include "log/LogOps.h" #include <string> #include <unistd.h> @@ -16,8 +17,6 @@ extern "C" { #include "postgres.h" } -UDSConnector::UDSConnector() { GOOGLE_PROTOBUF_VERIFY_VERSION; } - static void inline log_tracing_failure(const yagpcc::SetQueryReq &req, const std::string &event) { ereport(LOG, diff --git a/src/UDSConnector.h b/src/UDSConnector.h index 67504fc8529..f0dfcb77a3f 100644 --- a/src/UDSConnector.h +++ b/src/UDSConnector.h @@ -4,6 +4,6 @@ class UDSConnector { public: - UDSConnector(); - bool report_query(const yagpcc::SetQueryReq &req, const std::string &event); + bool static report_query(const yagpcc::SetQueryReq &req, + const std::string &event); }; \ No newline at end of file diff --git a/src/hook_wrappers.cpp b/src/hook_wrappers.cpp index d76b7c64e10..07ac511d546 100644 --- a/src/hook_wrappers.cpp +++ b/src/hook_wrappers.cpp @@ -32,6 +32,7 @@ static analyze_stats_collect_hook_type previous_analyze_stats_collect_hook = #ifdef IC_TEARDOWN_HOOK static ic_teardown_hook_type previous_ic_teardown_hook = nullptr; #endif +static ProcessUtility_hook_type previous_ProcessUtility_hook = nullptr; static void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags); static void ya_ExecutorRun_hook(QueryDesc *query_desc, ScanDirection direction, @@ -44,6 +45,10 @@ static void ya_ic_teardown_hook(ChunkTransportState *transportStates, #ifdef ANALYZE_STATS_COLLECT_HOOK static void ya_analyze_stats_collect_hook(QueryDesc *query_desc); #endif +static void ya_process_utility_hook(Node *parsetree, const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, DestReceiver *dest, + char *completionTag); static EventSender *sender = nullptr; @@ -85,6 +90,8 @@ void hooks_init() { analyze_stats_collect_hook = ya_analyze_stats_collect_hook; #endif stat_statements_parser_init(); + previous_ProcessUtility_hook = ProcessUtility_hook; + ProcessUtility_hook = ya_process_utility_hook; } void hooks_deinit() { @@ -104,6 +111,7 @@ void hooks_deinit() { delete sender; } YagpStat::deinit(); + ProcessUtility_hook = previous_ProcessUtility_hook; } void ya_ExecutorStart_hook(QueryDesc *query_desc, int eflags) { @@ -165,7 +173,8 @@ void ya_ExecutorEnd_hook(QueryDesc *query_desc) { } void ya_query_info_collect_hook(QueryMetricsStatus status, void *arg) { - cpp_call(get_sender(), &EventSender::query_metrics_collect, status, arg); + cpp_call(get_sender(), &EventSender::query_metrics_collect, status, + arg /* queryDesc */, false /* utility */, (ErrorData *)NULL); if (previous_query_info_collect_hook) { (*previous_query_info_collect_hook)(status, arg); } @@ -189,6 +198,55 @@ void ya_analyze_stats_collect_hook(QueryDesc *query_desc) { } #endif +static void ya_process_utility_hook(Node *parsetree, const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, DestReceiver *dest, + char *completionTag) { + /* Project utility data on QueryDesc to use existing logic */ + QueryDesc *query_desc = (QueryDesc *)palloc0(sizeof(QueryDesc)); + query_desc->sourceText = queryString; + + cpp_call(get_sender(), &EventSender::query_metrics_collect, + METRICS_QUERY_SUBMIT, (void *)query_desc, true /* utility */, + (ErrorData *)NULL); + + get_sender()->incr_depth(); + PG_TRY(); + { + if (previous_ProcessUtility_hook) { + (*previous_ProcessUtility_hook)(parsetree, queryString, context, params, + dest, completionTag); + } else { + standard_ProcessUtility(parsetree, queryString, context, params, dest, + completionTag); + } + + get_sender()->decr_depth(); + cpp_call(get_sender(), &EventSender::query_metrics_collect, METRICS_QUERY_DONE, + (void *)query_desc, true /* utility */, (ErrorData *)NULL); + + pfree(query_desc); + } + PG_CATCH(); + { + ErrorData *edata; + MemoryContext oldctx; + + oldctx = MemoryContextSwitchTo(TopMemoryContext); + edata = CopyErrorData(); + FlushErrorState(); + MemoryContextSwitchTo(oldctx); + + get_sender()->decr_depth(); + cpp_call(get_sender(), &EventSender::query_metrics_collect, METRICS_QUERY_ERROR, + (void *)query_desc, true /* utility */, edata); + + pfree(query_desc); + ReThrowError(edata); + } + PG_END_TRY(); +} + static void check_stats_loaded() { if (!YagpStat::loaded()) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), diff --git a/src/hook_wrappers.h b/src/hook_wrappers.h index c158f42cf1d..cfabf39485e 100644 --- a/src/hook_wrappers.h +++ b/src/hook_wrappers.h @@ -9,6 +9,9 @@ extern void hooks_deinit(); extern void yagp_functions_reset(); extern Datum yagp_functions_get(FunctionCallInfo fcinfo); +extern void init_log(); +extern void truncate_log(); + #ifdef __cplusplus } #endif \ No newline at end of file diff --git a/src/log/LogOps.cpp b/src/log/LogOps.cpp new file mode 100644 index 00000000000..0868dd9fc1c --- /dev/null +++ b/src/log/LogOps.cpp @@ -0,0 +1,131 @@ +#include "protos/yagpcc_set_service.pb.h" + +#include "LogOps.h" +#include "LogSchema.h" + +extern "C" { +#include "postgres.h" + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/xact.h" +#include "catalog/dependency.h" +#include "catalog/heap.h" +#include "catalog/namespace.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_type.h" +#include "cdb/cdbvars.h" +#include "commands/tablecmds.h" +#include "funcapi.h" +#include "fmgr.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/timestamp.h" +} + +void init_log() { + Oid namespaceId; + Oid relationId; + ObjectAddress tableAddr; + ObjectAddress schemaAddr; + + namespaceId = get_namespace_oid(schema_name.data(), false /* missing_ok */); + + /* Create table */ + relationId = heap_create_with_catalog( + log_relname.data() /* relname */, namespaceId /* namespace */, + 0 /* tablespace */, InvalidOid /* relid */, InvalidOid /* reltype oid */, + InvalidOid /* reloftypeid */, GetUserId() /* owner */, + DescribeTuple() /* rel tuple */, NIL, InvalidOid /* relam */, + RELKIND_RELATION, RELPERSISTENCE_PERMANENT, RELSTORAGE_HEAP, false, false, + true, 0, ONCOMMIT_NOOP, NULL /* GP Policy */, (Datum)0, + false /* use_user_acl */, true, true, false /* valid_opts */, + false /* is_part_child */, false /* is part parent */, NULL); + + /* Make the table visible */ + CommandCounterIncrement(); + + /* Record dependency of the table on the schema */ + if (OidIsValid(relationId) && OidIsValid(namespaceId)) { + ObjectAddressSet(tableAddr, RelationRelationId, relationId); + ObjectAddressSet(schemaAddr, NamespaceRelationId, namespaceId); + + /* Table can be dropped only via DROP EXTENSION */ + recordDependencyOn(&tableAddr, &schemaAddr, DEPENDENCY_EXTENSION); + } else { + ereport(NOTICE, (errmsg("YAGPCC failed to create log table or schema"))); + } + + /* Make changes visible */ + CommandCounterIncrement(); +} + +void insert_log(const yagpcc::SetQueryReq &req, bool utility) { + Oid namespaceId; + Oid relationId; + Relation rel; + HeapTuple tuple; + + /* Return if xact is not valid (needed for catalog lookups). */ + if (!IsTransactionState()) { + return; + } + + /* Return if extension was not loaded */ + namespaceId = get_namespace_oid(schema_name.data(), true /* missing_ok */); + if (!OidIsValid(namespaceId)) { + return; + } + + /* Return if the table was not created yet */ + relationId = get_relname_relid(log_relname.data(), namespaceId); + if (!OidIsValid(relationId)) { + return; + } + + bool nulls[natts_yagp_log]; + Datum values[natts_yagp_log]; + + memset(nulls, true, sizeof(nulls)); + memset(values, 0, sizeof(values)); + + extract_query_req(req, "", values, nulls); + nulls[attnum_yagp_log_utility] = false; + values[attnum_yagp_log_utility] = BoolGetDatum(utility); + + rel = heap_open(relationId, RowExclusiveLock); + + /* Insert the tuple as a frozen one to ensure it is logged even if txn rolls + * back or aborts */ + tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); + frozen_heap_insert(rel, tuple); + + heap_freetuple(tuple); + /* Keep lock on rel until end of xact */ + heap_close(rel, NoLock); + + /* Make changes visible */ + CommandCounterIncrement(); +} + +void truncate_log() { + Oid namespaceId; + Oid relationId; + Relation relation; + + namespaceId = get_namespace_oid(schema_name.data(), false /* missing_ok */); + relationId = get_relname_relid(log_relname.data(), namespaceId); + + relation = heap_open(relationId, AccessExclusiveLock); + + /* Truncate the main table */ + heap_truncate_one_rel(relation); + + /* Keep lock on rel until end of xact */ + heap_close(relation, NoLock); + + /* Make changes visible */ + CommandCounterIncrement(); +} \ No newline at end of file diff --git a/src/log/LogOps.h b/src/log/LogOps.h new file mode 100644 index 00000000000..bad03d09a8f --- /dev/null +++ b/src/log/LogOps.h @@ -0,0 +1,19 @@ +#pragma once + +#include <string> + +extern "C" { +#include "postgres.h" +#include "fmgr.h" +} + +extern "C" { +/* CREATE TABLE yagpcc.__log (...); */ +void init_log(); + +/* TRUNCATE yagpcc.__log */ +void truncate_log(); +} + +/* INSERT INTO yagpcc.__log VALUES (...) */ +void insert_log(const yagpcc::SetQueryReq &req, bool utility); diff --git a/src/log/LogSchema.cpp b/src/log/LogSchema.cpp new file mode 100644 index 00000000000..335a3103cfd --- /dev/null +++ b/src/log/LogSchema.cpp @@ -0,0 +1,135 @@ +#include "google/protobuf/reflection.h" +#include "google/protobuf/descriptor.h" +#include "google/protobuf/timestamp.pb.h" + +#include "LogSchema.h" + +const std::unordered_map<std::string_view, size_t> &proto_name_to_col_idx() { + static const auto name_col_idx = [] { + std::unordered_map<std::string_view, size_t> map; + map.reserve(log_tbl_desc.size()); + + for (size_t idx = 0; idx < natts_yagp_log; ++idx) { + map.emplace(log_tbl_desc[idx].proto_field_name, idx); + } + + return map; + }(); + return name_col_idx; +} + +TupleDesc DescribeTuple() { + TupleDesc tupdesc = CreateTemplateTupleDesc(natts_yagp_log, false); + + for (size_t anum = 1; anum <= natts_yagp_log; ++anum) { + TupleDescInitEntry(tupdesc, anum, log_tbl_desc[anum - 1].pg_att_name.data(), + log_tbl_desc[anum - 1].type_oid, -1 /* typmod */, + 0 /* attdim */); + } + + return tupdesc; +} + +Datum protots_to_timestamptz(const google::protobuf::Timestamp &ts) { + TimestampTz pgtimestamp = + (TimestampTz)ts.seconds() * USECS_PER_SEC + (ts.nanos() / 1000); + pgtimestamp -= (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; + return TimestampTzGetDatum(pgtimestamp); +} + +Datum field_to_datum(const google::protobuf::FieldDescriptor *field, + const google::protobuf::Reflection *reflection, + const google::protobuf::Message &msg) { + using namespace google::protobuf; + + switch (field->cpp_type()) { + case FieldDescriptor::CPPTYPE_INT32: + return Int32GetDatum(reflection->GetInt32(msg, field)); + case FieldDescriptor::CPPTYPE_INT64: + return Int64GetDatum(reflection->GetInt64(msg, field)); + case FieldDescriptor::CPPTYPE_UINT32: + return Int64GetDatum(reflection->GetUInt32(msg, field)); + case FieldDescriptor::CPPTYPE_UINT64: + return Int64GetDatum( + static_cast<int64_t>(reflection->GetUInt64(msg, field))); + case FieldDescriptor::CPPTYPE_DOUBLE: + return Float8GetDatum(reflection->GetDouble(msg, field)); + case FieldDescriptor::CPPTYPE_FLOAT: + return Float4GetDatum(reflection->GetFloat(msg, field)); + case FieldDescriptor::CPPTYPE_BOOL: + return BoolGetDatum(reflection->GetBool(msg, field)); + case FieldDescriptor::CPPTYPE_ENUM: + return CStringGetTextDatum(reflection->GetEnum(msg, field)->name().data()); + case FieldDescriptor::CPPTYPE_STRING: + return CStringGetTextDatum(reflection->GetString(msg, field).c_str()); + default: + return (Datum)0; + } +} + +void process_field(const google::protobuf::FieldDescriptor *field, + const google::protobuf::Reflection *reflection, + const google::protobuf::Message &msg, + const std::string &field_name, Datum *values, bool *nulls) { + + auto proto_idx_map = proto_name_to_col_idx(); + auto it = proto_idx_map.find(field_name); + + if (it == proto_idx_map.end()) { + ereport(NOTICE, + (errmsg("YAGPCC protobuf field %s is not registered in log table", + field_name.c_str()))); + return; + } + + int idx = it->second; + + if (!reflection->HasField(msg, field)) { + nulls[idx] = true; + return; + } + + if (field->cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE && + field->message_type()->full_name() == "google.protobuf.Timestamp") { + const auto &ts = static_cast<const google::protobuf::Timestamp &>( + reflection->GetMessage(msg, field)); + values[idx] = protots_to_timestamptz(ts); + } else { + values[idx] = field_to_datum(field, reflection, msg); + } + nulls[idx] = false; + + return; +} + +void extract_query_req(const google::protobuf::Message &msg, + const std::string &prefix, Datum *values, bool *nulls) { + using namespace google::protobuf; + + const Descriptor *descriptor = msg.GetDescriptor(); + const Reflection *reflection = msg.GetReflection(); + + for (int i = 0; i < descriptor->field_count(); ++i) { + const FieldDescriptor *field = descriptor->field(i); + + // For now, we do not log any repeated fields plus they need special + // treatment. + if (field->is_repeated()) { + continue; + } + + std::string curr_pref = prefix.empty() ? "" : prefix + "."; + std::string field_name = curr_pref + field->name().data(); + + if (field->cpp_type() == FieldDescriptor::CPPTYPE_MESSAGE && + field->message_type()->full_name() != "google.protobuf.Timestamp") { + + if (reflection->HasField(msg, field)) { + const Message &nested = reflection->GetMessage(msg, field); + extract_query_req(nested, field_name, values, nulls); + } + } else { + process_field(field, reflection, msg, field_name, values, nulls); + } + } +} diff --git a/src/log/LogSchema.h b/src/log/LogSchema.h new file mode 100644 index 00000000000..f713c1e9b0e --- /dev/null +++ b/src/log/LogSchema.h @@ -0,0 +1,166 @@ +#pragma once + +#include <array> +#include <string> +#include <string_view> +#include <unordered_map> + +extern "C" { +#include "postgres.h" +#include "access/htup_details.h" +#include "access/tupdesc.h" +#include "catalog/pg_type.h" +#include "utils/timestamp.h" +#include "utils/builtins.h" +} + +namespace google { +namespace protobuf { +class FieldDescriptor; +class Message; +class Reflection; +class Timestamp; +} // namespace protobuf +} // namespace google + +inline constexpr std::string_view schema_name = "yagpcc"; +inline constexpr std::string_view log_relname = "__log"; + +struct LogDesc { + std::string_view pg_att_name; + std::string_view proto_field_name; + Oid type_oid; +}; + +/* + * Definition of the log table structure. + * + * System stats collected as %lu (unsigned) may + * overflow INT8OID (signed), but this is acceptable. + */ +/* clang-format off */ +inline constexpr std::array log_tbl_desc = { + /* 8-byte aligned types first - Query Info */ + LogDesc{"query_id", "query_info.query_id", INT8OID}, + LogDesc{"plan_id", "query_info.plan_id", INT8OID}, + LogDesc{"nested_level", "add_info.nested_level", INT8OID}, + LogDesc{"slice_id", "add_info.slice_id", INT8OID}, + /* 8-byte aligned types - System Stats */ + LogDesc{"systemstat_vsize", "query_metrics.systemStat.vsize", INT8OID}, + LogDesc{"systemstat_rss", "query_metrics.systemStat.rss", INT8OID}, + LogDesc{"systemstat_vmsizekb", "query_metrics.systemStat.VmSizeKb", INT8OID}, + LogDesc{"systemstat_vmpeakkb", "query_metrics.systemStat.VmPeakKb", INT8OID}, + LogDesc{"systemstat_rchar", "query_metrics.systemStat.rchar", INT8OID}, + LogDesc{"systemstat_wchar", "query_metrics.systemStat.wchar", INT8OID}, + LogDesc{"systemstat_syscr", "query_metrics.systemStat.syscr", INT8OID}, + LogDesc{"systemstat_syscw", "query_metrics.systemStat.syscw", INT8OID}, + LogDesc{"systemstat_read_bytes", "query_metrics.systemStat.read_bytes", INT8OID}, + LogDesc{"systemstat_write_bytes", "query_metrics.systemStat.write_bytes", INT8OID}, + LogDesc{"systemstat_cancelled_write_bytes", "query_metrics.systemStat.cancelled_write_bytes", INT8OID}, + /* 8-byte aligned types - Metric Instrumentation */ + LogDesc{"instrumentation_ntuples", "query_metrics.instrumentation.ntuples", INT8OID}, + LogDesc{"instrumentation_nloops", "query_metrics.instrumentation.nloops", INT8OID}, + LogDesc{"instrumentation_tuplecount", "query_metrics.instrumentation.tuplecount", INT8OID}, + LogDesc{"instrumentation_shared_blks_hit", "query_metrics.instrumentation.shared_blks_hit", INT8OID}, + LogDesc{"instrumentation_shared_blks_read", "query_metrics.instrumentation.shared_blks_read", INT8OID}, + LogDesc{"instrumentation_shared_blks_dirtied", "query_metrics.instrumentation.shared_blks_dirtied", INT8OID}, + LogDesc{"instrumentation_shared_blks_written", "query_metrics.instrumentation.shared_blks_written", INT8OID}, + LogDesc{"instrumentation_local_blks_hit", "query_metrics.instrumentation.local_blks_hit", INT8OID}, + LogDesc{"instrumentation_local_blks_read", "query_metrics.instrumentation.local_blks_read", INT8OID}, + LogDesc{"instrumentation_local_blks_dirtied", "query_metrics.instrumentation.local_blks_dirtied", INT8OID}, + LogDesc{"instrumentation_local_blks_written", "query_metrics.instrumentation.local_blks_written", INT8OID}, + LogDesc{"instrumentation_temp_blks_read", "query_metrics.instrumentation.temp_blks_read", INT8OID}, + LogDesc{"instrumentation_temp_blks_written", "query_metrics.instrumentation.temp_blks_written", INT8OID}, + LogDesc{"instrumentation_inherited_calls", "query_metrics.instrumentation.inherited_calls", INT8OID}, + /* 8-byte aligned types - Network Stats */ + LogDesc{"instrumentation_sent_total_bytes", "query_metrics.instrumentation.sent.total_bytes", INT8OID}, + LogDesc{"instrumentation_sent_tuple_bytes", "query_metrics.instrumentation.sent.tuple_bytes", INT8OID}, + LogDesc{"instrumentation_sent_chunks", "query_metrics.instrumentation.sent.chunks", INT8OID}, + LogDesc{"instrumentation_received_total_bytes", "query_metrics.instrumentation.received.total_bytes", INT8OID}, + LogDesc{"instrumentation_received_tuple_bytes", "query_metrics.instrumentation.received.tuple_bytes", INT8OID}, + LogDesc{"instrumentation_received_chunks", "query_metrics.instrumentation.received.chunks", INT8OID}, + /* 8-byte aligned types - Interconnect Stats and spilled bytes */ + LogDesc{"interconnect_total_recv_queue_size", "query_metrics.instrumentation.interconnect.total_recv_queue_size", INT8OID}, + LogDesc{"interconnect_recv_queue_size_counting_time", "query_metrics.instrumentation.interconnect.recv_queue_size_counting_time", INT8OID}, + LogDesc{"interconnect_total_capacity", "query_metrics.instrumentation.interconnect.total_capacity", INT8OID}, + LogDesc{"interconnect_capacity_counting_time", "query_metrics.instrumentation.interconnect.capacity_counting_time", INT8OID}, + LogDesc{"interconnect_total_buffers", "query_metrics.instrumentation.interconnect.total_buffers", INT8OID}, + LogDesc{"interconnect_buffer_counting_time", "query_metrics.instrumentation.interconnect.buffer_counting_time", INT8OID}, + LogDesc{"interconnect_active_connections_num", "query_metrics.instrumentation.interconnect.active_connections_num", INT8OID}, + LogDesc{"interconnect_retransmits", "query_metrics.instrumentation.interconnect.retransmits", INT8OID}, + LogDesc{"interconnect_startup_cached_pkt_num", "query_metrics.instrumentation.interconnect.startup_cached_pkt_num", INT8OID}, + LogDesc{"interconnect_mismatch_num", "query_metrics.instrumentation.interconnect.mismatch_num", INT8OID}, + LogDesc{"interconnect_crc_errors", "query_metrics.instrumentation.interconnect.crc_errors", INT8OID}, + LogDesc{"interconnect_snd_pkt_num", "query_metrics.instrumentation.interconnect.snd_pkt_num", INT8OID}, + LogDesc{"interconnect_recv_pkt_num", "query_metrics.instrumentation.interconnect.recv_pkt_num", INT8OID}, + LogDesc{"interconnect_disordered_pkt_num", "query_metrics.instrumentation.interconnect.disordered_pkt_num", INT8OID}, + LogDesc{"interconnect_duplicated_pkt_num", "query_metrics.instrumentation.interconnect.duplicated_pkt_num", INT8OID}, + LogDesc{"interconnect_recv_ack_num", "query_metrics.instrumentation.interconnect.recv_ack_num", INT8OID}, + LogDesc{"interconnect_status_query_msg_num", "query_metrics.instrumentation.interconnect.status_query_msg_num", INT8OID}, + LogDesc{"spill_totalbytes", "query_metrics.spill.totalBytes", INT8OID}, + /* 8-byte aligned types - Float and Timestamp */ + LogDesc{"systemstat_runningtimeseconds", "query_metrics.systemStat.runningTimeSeconds", FLOAT8OID}, + LogDesc{"systemstat_usertimeseconds", "query_metrics.systemStat.userTimeSeconds", FLOAT8OID}, + LogDesc{"systemstat_kerneltimeseconds", "query_metrics.systemStat.kernelTimeSeconds", FLOAT8OID}, + LogDesc{"instrumentation_firsttuple", "query_metrics.instrumentation.firsttuple", FLOAT8OID}, + LogDesc{"instrumentation_startup", "query_metrics.instrumentation.startup", FLOAT8OID}, + LogDesc{"instrumentation_total", "query_metrics.instrumentation.total", FLOAT8OID}, + LogDesc{"instrumentation_blk_read_time", "query_metrics.instrumentation.blk_read_time", FLOAT8OID}, + LogDesc{"instrumentation_blk_write_time", "query_metrics.instrumentation.blk_write_time", FLOAT8OID}, + LogDesc{"instrumentation_startup_time", "query_metrics.instrumentation.startup_time", FLOAT8OID}, + LogDesc{"instrumentation_inherited_time", "query_metrics.instrumentation.inherited_time", FLOAT8OID}, + LogDesc{"datetime", "datetime", TIMESTAMPTZOID}, + LogDesc{"submit_time", "submit_time", TIMESTAMPTZOID}, + LogDesc{"start_time", "start_time", TIMESTAMPTZOID}, + LogDesc{"end_time", "end_time", TIMESTAMPTZOID}, + /* 4-byte aligned types - Query Key */ + LogDesc{"tmid", "query_key.tmid", INT4OID}, + LogDesc{"ssid", "query_key.ssid", INT4OID}, + LogDesc{"ccnt", "query_key.ccnt", INT4OID}, + /* 4-byte aligned types - Segment Key */ + LogDesc{"dbid", "segment_key.dbid", INT4OID}, + LogDesc{"segid", "segment_key.segindex", INT4OID}, + LogDesc{"spill_filecount", "query_metrics.spill.fileCount", INT4OID}, + /* Variable-length types - Query Info */ + LogDesc{"generator", "query_info.generator", TEXTOID}, + LogDesc{"query_text", "query_info.query_text", TEXTOID}, + LogDesc{"plan_text", "query_info.plan_text", TEXTOID}, + LogDesc{"template_query_text", "query_info.template_query_text", TEXTOID}, + LogDesc{"template_plan_text", "query_info.template_plan_text", TEXTOID}, + LogDesc{"user_name", "query_info.userName", TEXTOID}, + LogDesc{"database_name", "query_info.databaseName", TEXTOID}, + LogDesc{"rsgname", "query_info.rsgname", TEXTOID}, + LogDesc{"analyze_text", "query_info.analyze_text", TEXTOID}, + LogDesc{"error_message", "add_info.error_message", TEXTOID}, + LogDesc{"query_status", "query_status", TEXTOID}, + /* Extra field */ + LogDesc{"utility", "", BOOLOID}, +}; +/* clang-format on */ + +inline constexpr size_t natts_yagp_log = log_tbl_desc.size(); +inline constexpr size_t attnum_yagp_log_utility = natts_yagp_log - 1; + +const std::unordered_map<std::string_view, size_t> &proto_name_to_col_idx(); + +TupleDesc DescribeTuple(); + +Datum protots_to_timestamptz(const google::protobuf::Timestamp &ts); + +Datum field_to_datum(const google::protobuf::FieldDescriptor *field, + const google::protobuf::Reflection *reflection, + const google::protobuf::Message &msg); + +/* Process a single proto field and store in values/nulls arrays */ +void process_field(const google::protobuf::FieldDescriptor *field, + const google::protobuf::Reflection *reflection, + const google::protobuf::Message &msg, + const std::string &field_name, Datum *values, bool *nulls); + +/* + * Extracts values from msg into values/nulls arrays. Caller must + * pre-init nulls[] to true (this function does net set nulls + * to true for nested messages if parent message is missing). + */ +void extract_query_req(const google::protobuf::Message &msg, + const std::string &prefix, Datum *values, bool *nulls); diff --git a/src/memory/gpdbwrappers.cpp b/src/memory/gpdbwrappers.cpp index 9d579a91a30..0824a3a6808 100644 --- a/src/memory/gpdbwrappers.cpp +++ b/src/memory/gpdbwrappers.cpp @@ -1,4 +1,5 @@ #include "gpdbwrappers.h" +#include "log/LogOps.h" extern "C" { #include "postgres.h" @@ -126,8 +127,8 @@ ExplainState ya_gpdb::get_explain_state(QueryDesc *query_desc, }); } -ExplainState ya_gpdb::get_analyze_state_json(QueryDesc *query_desc, - bool analyze) noexcept { +ExplainState ya_gpdb::get_analyze_state(QueryDesc *query_desc, + bool analyze) noexcept { return wrap_noexcept([&]() { ExplainState es; ExplainInitState(&es); @@ -136,7 +137,7 @@ ExplainState ya_gpdb::get_analyze_state_json(QueryDesc *query_desc, es.buffers = es.analyze; es.timing = es.analyze; es.summary = es.analyze; - es.format = EXPLAIN_FORMAT_JSON; + es.format = EXPLAIN_FORMAT_TEXT; ExplainBeginOutput(&es); if (analyze) { ExplainPrintPlan(&es, query_desc); @@ -220,4 +221,8 @@ char *ya_gpdb::get_rg_name_for_id(Oid group_id) { Oid ya_gpdb::get_rg_id_by_session_id(int session_id) { return wrap_throw(ResGroupGetGroupIdBySessionId, session_id); -} \ No newline at end of file +} + +void ya_gpdb::insert_log(const yagpcc::SetQueryReq &req, bool utility) { + return wrap_throw(::insert_log, req, utility); +} diff --git a/src/memory/gpdbwrappers.h b/src/memory/gpdbwrappers.h index ad7ae96c362..8f5f146cc67 100644 --- a/src/memory/gpdbwrappers.h +++ b/src/memory/gpdbwrappers.h @@ -16,6 +16,10 @@ extern "C" { #include <utility> #include <string> +namespace yagpcc { +class SetQueryReq; +} // namespace yagpcc + namespace ya_gpdb { // Functions that call palloc(). @@ -27,8 +31,7 @@ char *get_database_name(Oid dbid) noexcept; bool split_identifier_string(char *rawstring, char separator, List **namelist) noexcept; ExplainState get_explain_state(QueryDesc *query_desc, bool costs) noexcept; -ExplainState get_analyze_state_json(QueryDesc *query_desc, - bool analyze) noexcept; +ExplainState get_analyze_state(QueryDesc *query_desc, bool analyze) noexcept; Instrumentation *instr_alloc(size_t n, int instrument_options); HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull); @@ -38,6 +41,7 @@ void instr_end_loop(Instrumentation *instr); char *gen_normquery(const char *query); StringInfo gen_normplan(const char *executionPlan); char *get_rg_name_for_id(Oid group_id); +void insert_log(const yagpcc::SetQueryReq &req, bool utility); // Palloc-free functions. void pfree(void *pointer) noexcept; diff --git a/src/yagp_hooks_collector.c b/src/yagp_hooks_collector.c index 2a9e7328e6d..9db73638b24 100644 --- a/src/yagp_hooks_collector.c +++ b/src/yagp_hooks_collector.c @@ -10,6 +10,8 @@ void _PG_init(void); void _PG_fini(void); PG_FUNCTION_INFO_V1(yagp_stat_messages_reset); PG_FUNCTION_INFO_V1(yagp_stat_messages); +PG_FUNCTION_INFO_V1(yagp_init_log); +PG_FUNCTION_INFO_V1(yagp_truncate_log); void _PG_init(void) { if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_EXECUTE) { @@ -30,4 +32,14 @@ Datum yagp_stat_messages_reset(PG_FUNCTION_ARGS) { Datum yagp_stat_messages(PG_FUNCTION_ARGS) { return yagp_functions_get(fcinfo); -} \ No newline at end of file +} + +Datum yagp_init_log(PG_FUNCTION_ARGS) { + init_log(); + PG_RETURN_VOID(); +} + +Datum yagp_truncate_log(PG_FUNCTION_ARGS) { + truncate_log(); + PG_RETURN_VOID(); +} diff --git a/yagp_hooks_collector--1.0--1.1.sql b/yagp_hooks_collector--1.0--1.1.sql new file mode 100644 index 00000000000..959d4f235d1 --- /dev/null +++ b/yagp_hooks_collector--1.0--1.1.sql @@ -0,0 +1,113 @@ +/* yagp_hooks_collector--1.0--1.1.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION yagp_hooks_collector UPDATE TO '1.1'" to load this file. \quit + +CREATE SCHEMA yagpcc; + +-- Unlink existing objects from extension. +ALTER EXTENSION yagp_hooks_collector DROP VIEW yagp_stat_messages; +ALTER EXTENSION yagp_hooks_collector DROP FUNCTION yagp_stat_messages_reset(); +ALTER EXTENSION yagp_hooks_collector DROP FUNCTION __yagp_stat_messages_f_on_segments(); +ALTER EXTENSION yagp_hooks_collector DROP FUNCTION __yagp_stat_messages_f_on_master(); +ALTER EXTENSION yagp_hooks_collector DROP FUNCTION __yagp_stat_messages_reset_f_on_segments(); +ALTER EXTENSION yagp_hooks_collector DROP FUNCTION __yagp_stat_messages_reset_f_on_master(); + +-- Now drop the objects. +DROP VIEW yagp_stat_messages; +DROP FUNCTION yagp_stat_messages_reset(); +DROP FUNCTION __yagp_stat_messages_f_on_segments(); +DROP FUNCTION __yagp_stat_messages_f_on_master(); +DROP FUNCTION __yagp_stat_messages_reset_f_on_segments(); +DROP FUNCTION __yagp_stat_messages_reset_f_on_master(); + +-- Recreate functions and view in new schema. +CREATE FUNCTION yagpcc.__stat_messages_reset_f_on_master() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_stat_messages_reset' +LANGUAGE C EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__stat_messages_reset_f_on_segments() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_stat_messages_reset' +LANGUAGE C EXECUTE ON ALL SEGMENTS; + +CREATE FUNCTION yagpcc.stat_messages_reset() +RETURNS void +AS +$$ + SELECT yagpcc.__stat_messages_reset_f_on_master(); + SELECT yagpcc.__stat_messages_reset_f_on_segments(); +$$ +LANGUAGE SQL EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__stat_messages_f_on_master() +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'yagp_stat_messages' +LANGUAGE C STRICT VOLATILE EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__stat_messages_f_on_segments() +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'yagp_stat_messages' +LANGUAGE C STRICT VOLATILE EXECUTE ON ALL SEGMENTS; + +CREATE VIEW yagpcc.stat_messages AS + SELECT C.* + FROM yagpcc.__stat_messages_f_on_master() as C ( + segid int, + total_messages bigint, + send_failures bigint, + connection_failures bigint, + other_errors bigint, + max_message_size int + ) + UNION ALL + SELECT C.* + FROM yagpcc.__stat_messages_f_on_segments() as C ( + segid int, + total_messages bigint, + send_failures bigint, + connection_failures bigint, + other_errors bigint, + max_message_size int + ) +ORDER BY segid; + +-- Create new objects. +CREATE FUNCTION yagpcc.__init_log_on_master() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_init_log' +LANGUAGE C STRICT VOLATILE EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__init_log_on_segments() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_init_log' +LANGUAGE C STRICT VOLATILE EXECUTE ON ALL SEGMENTS; + +-- Creates log table inside yagpcc schema. +SELECT yagpcc.__init_log_on_master(); +SELECT yagpcc.__init_log_on_segments(); + +CREATE VIEW yagpcc.log AS + SELECT * FROM yagpcc.__log -- master + UNION ALL + SELECT * FROM gp_dist_random('yagpcc.__log') -- segments + ORDER BY tmid, ssid, ccnt; + +CREATE FUNCTION yagpcc.__truncate_log_on_master() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_truncate_log' +LANGUAGE C STRICT VOLATILE EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__truncate_log_on_segments() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_truncate_log' +LANGUAGE C STRICT VOLATILE EXECUTE ON ALL SEGMENTS; + +CREATE FUNCTION yagpcc.truncate_log() +RETURNS void AS $$ +BEGIN + PERFORM yagpcc.__truncate_log_on_master(); + PERFORM yagpcc.__truncate_log_on_segments(); +END; +$$ LANGUAGE plpgsql VOLATILE; diff --git a/sql/yagp_hooks_collector--1.0.sql b/yagp_hooks_collector--1.0.sql similarity index 99% rename from sql/yagp_hooks_collector--1.0.sql rename to yagp_hooks_collector--1.0.sql index 88bbe4e0dc7..7ab4e1b2fb7 100644 --- a/sql/yagp_hooks_collector--1.0.sql +++ b/yagp_hooks_collector--1.0.sql @@ -15,7 +15,7 @@ LANGUAGE C EXECUTE ON ALL SEGMENTS; CREATE FUNCTION yagp_stat_messages_reset() RETURNS void -AS +AS $$ SELECT __yagp_stat_messages_reset_f_on_master(); SELECT __yagp_stat_messages_reset_f_on_segments(); diff --git a/yagp_hooks_collector--1.1.sql b/yagp_hooks_collector--1.1.sql new file mode 100644 index 00000000000..657720a88f2 --- /dev/null +++ b/yagp_hooks_collector--1.1.sql @@ -0,0 +1,95 @@ +/* yagp_hooks_collector--1.1.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION yagp_hooks_collector" to load this file. \quit + +CREATE SCHEMA yagpcc; + +CREATE FUNCTION yagpcc.__stat_messages_reset_f_on_master() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_stat_messages_reset' +LANGUAGE C EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__stat_messages_reset_f_on_segments() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_stat_messages_reset' +LANGUAGE C EXECUTE ON ALL SEGMENTS; + +CREATE FUNCTION yagpcc.stat_messages_reset() +RETURNS void +AS +$$ + SELECT yagpcc.__stat_messages_reset_f_on_master(); + SELECT yagpcc.__stat_messages_reset_f_on_segments(); +$$ +LANGUAGE SQL EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__stat_messages_f_on_master() +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'yagp_stat_messages' +LANGUAGE C STRICT VOLATILE EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__stat_messages_f_on_segments() +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'yagp_stat_messages' +LANGUAGE C STRICT VOLATILE EXECUTE ON ALL SEGMENTS; + +CREATE VIEW yagpcc.stat_messages AS + SELECT C.* + FROM yagpcc.__stat_messages_f_on_master() as C ( + segid int, + total_messages bigint, + send_failures bigint, + connection_failures bigint, + other_errors bigint, + max_message_size int + ) + UNION ALL + SELECT C.* + FROM yagpcc.__stat_messages_f_on_segments() as C ( + segid int, + total_messages bigint, + send_failures bigint, + connection_failures bigint, + other_errors bigint, + max_message_size int + ) +ORDER BY segid; + +CREATE FUNCTION yagpcc.__init_log_on_master() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_init_log' +LANGUAGE C STRICT VOLATILE EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__init_log_on_segments() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_init_log' +LANGUAGE C STRICT VOLATILE EXECUTE ON ALL SEGMENTS; + +-- Creates log table inside yagpcc schema. +SELECT yagpcc.__init_log_on_master(); +SELECT yagpcc.__init_log_on_segments(); + +CREATE VIEW yagpcc.log AS + SELECT * FROM yagpcc.__log -- master + UNION ALL + SELECT * FROM gp_dist_random('yagpcc.__log') -- segments +ORDER BY tmid, ssid, ccnt; + +CREATE FUNCTION yagpcc.__truncate_log_on_master() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_truncate_log' +LANGUAGE C STRICT VOLATILE EXECUTE ON MASTER; + +CREATE FUNCTION yagpcc.__truncate_log_on_segments() +RETURNS void +AS 'MODULE_PATHNAME', 'yagp_truncate_log' +LANGUAGE C STRICT VOLATILE EXECUTE ON ALL SEGMENTS; + +CREATE FUNCTION yagpcc.truncate_log() +RETURNS void AS $$ +BEGIN + PERFORM yagpcc.__truncate_log_on_master(); + PERFORM yagpcc.__truncate_log_on_segments(); +END; +$$ LANGUAGE plpgsql VOLATILE; diff --git a/yagp_hooks_collector.control b/yagp_hooks_collector.control index b5539dd6462..cb5906a1302 100644 --- a/yagp_hooks_collector.control +++ b/yagp_hooks_collector.control @@ -1,5 +1,5 @@ # yagp_hooks_collector extension comment = 'Intercept query and plan execution hooks and report them to Yandex GPCC agents' -default_version = '1.0' +default_version = '1.1' module_pathname = '$libdir/yagp_hooks_collector' superuser = true --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
