gerdansantos commented on issue #4065: NIFI-4239 - Adding CaptureChangePostgreSQL processor to capture data changes (INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication. URL: https://github.com/apache/nifi/pull/4065#issuecomment-590651473 > This is going to be a great new feature! I left some initial comments, and I haven't been able to get it to work yet on my PostgreSQL 12 server. Is there some n00b tutorial you can point me to so I can make sure I set everything up right? I created a publication and a replication slot, but I don't know how/if they work together, and I didn't get any events into the processor. Configuration on PostgreSQL Server ============= postgresql.conf --------------- Your database should be configured to enable logical replication. * Property **max_wal_senders** should be at least equal to the number of replication consumers. * Property **wal_keep_segments** should contain count wal segments that can't be removed from database. * Property **wal_level** for logical replication should be equal to logical. * Property **max_replication_slots** should be greater than zero for logical replication, because logical replication can't work without replication slot. Example: ``` listen_addresses = '*' # what IP address(es) to listen on; # comma-separated list of addresses; # defaults to 'localhost'; use '*' for all # (change requires restart) max_wal_senders = 4 # max number of walsender processes wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables wal_level = logical # minimal, replica, or logical max_replication_slots = 4 # max number of replication slots ``` After configurations, restart/reload PostgreSQL service. pg_hba.conf ----------- Enable connect user with replication privileges to replication stream and make a statments on PostgreSQL Server. Example, Put this lines at pg_hba.conf: ``` host all all all md5 #For SnapShot Connection host replication all all md5 #For Replication Connection ``` After configurations, restart PostgreSQL service. Example of Use =============== PostgreSQL ---------- Create a PostgreSQL table, for this example we use a postgres database; JUST FOR LEARNING PURPOSE; Table struct: ``` Table "public.cidade" Column | Type | Collation | Nullable | Default -----------+---------+-----------+----------+--------- codigo | integer | | not null | data_fund | date | | not null | nome | text | | | Publications: "cidade_pub" ``` ``` CREATE TABLE cidade(codigo integer not null, data_fund date not null, nome text); ALTER TABLE cidade REPLICA IDENTITY FULL; ``` **OBS**: A published table must have a “replica identity” configured in order to be able to replicate UPDATE and DELETE operations, so that appropriate rows to update or delete can be identified on the subscriber side. By default, this is the primary key, if there is one. Another unique index (with certain additional requirements) can also be set to be the replica identity. If the table does not have any suitable key, then it can be set to replica identity “full”, which means the entire row becomes the key. This, however, is very inefficient and should only be used as a fallback if no other solution is possible. More details: https://www.postgresql.org/docs/10/logical-replication-publication.html First, you need a Publication for the tables that you want to capture data changes: ``` CREATE PUBLICATION cidade_pub FOR TABLE cidade; ``` You can decide the NiFi CDC PostgreSQL processor create a slot for you... Or you can create manually. ``` SELECT * FROM pg_create_logical_replication_slot('slt_cidade_pub', 'pgoutput'); ``` Link NiFi CDC PostgreSQL processor to this PostgreSQL Instance: Example ``` PostgreSQL Host: 127.0.0.1 PostgreSQL Driver Class Name: org.postgresql.Driver PostgreSQL Driver Location(s): /nifi-1.11.0/jdbc/postgresql-42.2.9.jar Database: postgres Username: postgres Password:123123 Publication: cidade_pub Slot Name: slt_cidade_pub Make Snapshot: false Include Begin/Commit Events: false Initial Log Sequence Number - LSN: [empty property] Drop If Exists Replication Slot: false ``` Link to LogAttribute for example, so start CaptureChangePostgreSQL So execute at PostgreSQL Server ``` INSERT INTO cidade (codigo, data_fund, nome) VALUES (4, now(), 'New York'); UPDATE cidade SET codigo = 20 WHERE codigo = 4; UPDATE cidade SET nome = 'Statue of Liberty City' WHERE nome = 'New York'; DELETE FROM cidade WHERE codigo >= 4; ``` Output at NiFi Queue: ``` {"insert":{"cidade":{"codigo":4,"nome":"New York","data_fund":"2020-02-24"}}} {"update":{"cidade":{"codigo":20,"nome":"New York","data_fund":"2020-02-24"}}} {"update":{"cidade":{"codigo":20,"nome":"Statue of Liberty City","data_fund":"2020-02-24"}}} {"delete":{"cidade":{"codigo":20,"nome":"Statue of Liberty City","data_fund":"2020-02-24"}}} ``` ----------
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
