georgelza commented on issue #3342:
URL: https://github.com/apache/polaris/issues/3342#issuecomment-3705776446

   hi hi,
   
   Here are some examples as executed in Flink.
   
   Flink has the concept of a Generic in memory Catalog, and only visible for 
the specific session  where it was created.
   
   ```
   CREATE CATALOG c_cdcsource WITH 
       ('type'='generic_in_memory');
   
   USE CATALOG c_cdcsource;
   
   -- Source for PyFlink
   CREATE DATABASE IF NOT EXISTS demog;
   ```
   
   Inside this we can then create a table as follows.
   
   Remember, only for that Flink Session, so you can't exit and come back, if 
you did then you needed to redo the above.
   
   **PostgreSQL CDC Source**
   
   ```
   USE CATALOG c_cdcsource;
   CREATE DATABASE IF NOT EXISTS demog;  
   USE demog;
   
   CREATE OR REPLACE TABLE accountholders (
        _id                BIGINT                  NOT NULL
       ,nationalid         VARCHAR(16)             NOT NULL
       ,firstname          VARCHAR(100)
       ,lastname           VARCHAR(100)
       ,dob                VARCHAR(10) 
       ,gender             VARCHAR(10)
       ,children           INT
       ,address            STRING
       ,accounts           STRING
       ,emailaddress       VARCHAR(100)
       ,mobilephonenumber  VARCHAR(20)
       ,created_at         TIMESTAMP_LTZ(3)
       ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
       ,PRIMARY KEY (_id) NOT ENFORCED
   ) WITH (
        'connector'                           = 'postgres-cdc'
       ,'hostname'                            = 'postgrescdc'
       ,'port'                                = '5432'
       ,'username'                            = 'dbadmin'
       ,'password'                            = 'dbpassword'
       ,'database-name'                       = 'demog'
       ,'schema-name'                         = 'public'
       ,'table-name'                          = 'accountholders'
       ,'slot.name'                           = 'accountholders_pyflink'        
   -- Can't include capital letters
       ,'scan.incremental.snapshot.enabled'   = 'true'               
       ,'scan.startup.mode'                   = 'initial'            
       ,'decoding.plugin.name'                = 'pgoutput'
   );
   ```
   
   **Kafa Source**
   
   ```
   CREATE OR REPLACE TABLE c_cdcsource.demog.t_salescompleted (
       INVNUMBER STRING,
       SALEDATETIME_LTZ STRING,
       SALETIMESTAMP_EPOC STRING,
       TERMINALPOINT STRING,
       NETT DOUBLE,
       VAT DOUBLE,
       TOTAL DOUBLE,
       STORE row<ID STRING, NAME STRING>,
       CLERK row<ID STRING, NAME STRING, SURNAME STRING>,
       BASKETITEMS array<row<ID STRING, NAME STRING, BRAND STRING, CATEGORY 
STRING, PRICE DOUBLE, QUANTITY INT>>,
       FINTRANSACTIONID STRING,
       PAYDATETIME_LTZ STRING,
       PAYTIMESTAMP_EPOC STRING,
       PAID DOUBLE,
       SALESTIMESTAMP_WM as 
TO_TIMESTAMP(FROM_UNIXTIME(CAST(`SALETIMESTAMP_EPOC` AS BIGINT) / 1000)),
       WATERMARK FOR SALESTIMESTAMP_WM AS SALESTIMESTAMP_WM
   ) WITH (
       'connector'                     = 'kafka',
       'topic'                         = 'avro_salescompleted',
       'properties.bootstrap.servers'  = 'broker:29092',
       'scan.startup.mode'             = 'earliest-offset',
       'properties.group.id'           = 'testGroup',
       'value.format'                  = 'avro-confluent',
       'value.avro-confluent.schema-registry.url' = 
'http://schema-registry:9081',
       'value.fields-include'          = 'ALL'
   );
   ```
   
   The same can be configured against MySQL, MongoDB, and other Lakehouse 
tables like Iceberg, Paimon. Hudi etc as CDC sources.
   
   What I'm hoping is that the "in memory" bit can be "dropped" and whats 
created in the "Generic" catalog is actually persisted inside a catalog store, 
Polaris/Rest would be great, JDBC is also a convenient option, but hey, we are 
in the Polaris world... ;)
   
   hope this clarifies it.
   
   G


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to