gerdansantos commented on issue #4065: NIFI-4239 - Adding 
CaptureChangePostgreSQL processor to capture data changes 
(INSERT/UPDATE/DELETE) in PostgreSQL tables via Logical Replication.
URL: https://github.com/apache/nifi/pull/4065#issuecomment-590651473
 
 
   > This is going to be a great new feature! I left some initial comments, and 
I haven't been able to get it to work yet on my PostgreSQL 12 server. Is there 
some n00b tutorial you can point me to so I can make sure I set everything up 
right? I created a publication and a replication slot, but I don't know how/if 
they work together, and I didn't get any events into the processor.
   
   Configuration on PostgreSQL Server
   =============
   
   postgresql.conf
   ---------------
   
   Your database should be configured to enable logical replication.
   
   * Property **max_wal_senders** should be at least equal to the number of 
replication consumers.
   * Property **wal_keep_segments** should contain count wal segments that 
can't be removed from database.
   * Property **wal_level** for logical replication should be equal to logical.
   * Property **max_replication_slots** should be greater than zero for logical 
replication, because logical replication can't work without replication slot.
   
   Example:
   ```
   listen_addresses = '*'               # what IP address(es) to listen on;
                                   # comma-separated list of addresses;
                                   # defaults to 'localhost'; use '*' for all
                                   # (change requires restart)
   max_wal_senders = 4             # max number of walsender processes
   wal_keep_segments = 4           # in logfile segments, 16MB each; 0 disables
   wal_level = logical             # minimal, replica, or logical
   max_replication_slots = 4       # max number of replication slots
   ```
   
   After configurations, restart/reload PostgreSQL service.
   
   pg_hba.conf
   -----------
   
   Enable connect user with replication privileges to replication stream and 
make a statments on PostgreSQL Server.
   
   Example, Put this lines at pg_hba.conf:
   ```
   host    all           all              all                     md5 #For 
SnapShot Connection
   host    replication   all              all                     md5 #For 
Replication Connection
   ```
   
   After configurations, restart PostgreSQL service.
   
   
   Example of Use
   ===============
   
   PostgreSQL
   ----------
   Create a PostgreSQL table, for this example we use a postgres database; JUST 
FOR LEARNING PURPOSE;
   
   Table struct:
   
   ```
                   Table "public.cidade"
     Column   |  Type   | Collation | Nullable | Default
   -----------+---------+-----------+----------+---------
    codigo    | integer |           | not null |
    data_fund | date    |           | not null |
    nome      | text    |           |          |
   Publications:
       "cidade_pub"
   ```
   ```
   CREATE TABLE cidade(codigo integer not null, data_fund date not null, nome 
text);
   ALTER TABLE cidade REPLICA IDENTITY FULL;
   ```
   **OBS**: A published table must have a “replica identity” configured in 
order to be able to replicate UPDATE and DELETE operations, so that appropriate 
rows to update or delete can be identified on the subscriber side. By default, 
this is the primary key, if there is one. Another unique index (with certain 
additional requirements) can also be set to be the replica identity. If the 
table does not have any suitable key, then it can be set to replica identity 
“full”, which means the entire row becomes the key. This, however, is very 
inefficient and should only be used as a fallback if no other solution is 
possible. More details: 
https://www.postgresql.org/docs/10/logical-replication-publication.html
   
   
   First, you need a Publication for the tables that you want to capture data 
changes:
   ```
   CREATE PUBLICATION cidade_pub FOR TABLE cidade;
   ```
   
   You can decide the NiFi CDC PostgreSQL processor create a slot for you...
   Or you can create manually.
   ```
   SELECT * FROM pg_create_logical_replication_slot('slt_cidade_pub', 
'pgoutput');
   ```
   Link NiFi CDC PostgreSQL processor to this PostgreSQL Instance:
   Example
   ```
   PostgreSQL Host: 127.0.0.1
   PostgreSQL Driver Class Name: org.postgresql.Driver
   PostgreSQL Driver Location(s): /nifi-1.11.0/jdbc/postgresql-42.2.9.jar
   Database: postgres
   Username: postgres
   Password:123123
   Publication: cidade_pub
   Slot Name: slt_cidade_pub
   Make Snapshot: false
   Include Begin/Commit Events: false
   Initial Log Sequence Number - LSN: [empty property]
   Drop If Exists Replication Slot: false
   ```
   
   Link to LogAttribute for example, so start CaptureChangePostgreSQL
   
   So execute at PostgreSQL Server
   
   ```
   INSERT INTO cidade (codigo, data_fund, nome) VALUES (4, now(), 'New York');
   UPDATE cidade SET codigo = 20 WHERE codigo = 4;
   UPDATE cidade SET nome = 'Statue of Liberty City' WHERE nome = 'New York';
   DELETE FROM cidade WHERE codigo >= 4;
   ```
   
   Output at NiFi Queue:
   ```
   {"insert":{"cidade":{"codigo":4,"nome":"New York","data_fund":"2020-02-24"}}}
   {"update":{"cidade":{"codigo":20,"nome":"New 
York","data_fund":"2020-02-24"}}}
   {"update":{"cidade":{"codigo":20,"nome":"Statue of Liberty 
City","data_fund":"2020-02-24"}}}
   {"delete":{"cidade":{"codigo":20,"nome":"Statue of Liberty 
City","data_fund":"2020-02-24"}}}
   ```
   ----------

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to