georgelza commented on issue #3342:
URL: https://github.com/apache/polaris/issues/3342#issuecomment-3705776446
hi hi,
Here are some examples as executed in Flink.
Flink has the concept of a Generic in memory Catalog, and only visible for
the specific session where it was created.
```
CREATE CATALOG c_cdcsource WITH
('type'='generic_in_memory');
USE CATALOG c_cdcsource;
-- Source for PyFlink
CREATE DATABASE IF NOT EXISTS demog;
```
Inside this we can then create a table as follows.
Remember, only for that Flink Session, so you can't exit and come back, if
you did then you needed to redo the above.
**PostgreSQL CDC Source**
```
USE CATALOG c_cdcsource;
CREATE DATABASE IF NOT EXISTS demog;
USE demog;
CREATE OR REPLACE TABLE accountholders (
_id BIGINT NOT NULL
,nationalid VARCHAR(16) NOT NULL
,firstname VARCHAR(100)
,lastname VARCHAR(100)
,dob VARCHAR(10)
,gender VARCHAR(10)
,children INT
,address STRING
,accounts STRING
,emailaddress VARCHAR(100)
,mobilephonenumber VARCHAR(20)
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'accountholders'
,'slot.name' = 'accountholders_pyflink'
-- Can't include capital letters
,'scan.incremental.snapshot.enabled' = 'true'
,'scan.startup.mode' = 'initial'
,'decoding.plugin.name' = 'pgoutput'
);
```
**Kafa Source**
```
CREATE OR REPLACE TABLE c_cdcsource.demog.t_salescompleted (
INVNUMBER STRING,
SALEDATETIME_LTZ STRING,
SALETIMESTAMP_EPOC STRING,
TERMINALPOINT STRING,
NETT DOUBLE,
VAT DOUBLE,
TOTAL DOUBLE,
STORE row<ID STRING, NAME STRING>,
CLERK row<ID STRING, NAME STRING, SURNAME STRING>,
BASKETITEMS array<row<ID STRING, NAME STRING, BRAND STRING, CATEGORY
STRING, PRICE DOUBLE, QUANTITY INT>>,
FINTRANSACTIONID STRING,
PAYDATETIME_LTZ STRING,
PAYTIMESTAMP_EPOC STRING,
PAID DOUBLE,
SALESTIMESTAMP_WM as
TO_TIMESTAMP(FROM_UNIXTIME(CAST(`SALETIMESTAMP_EPOC` AS BIGINT) / 1000)),
WATERMARK FOR SALESTIMESTAMP_WM AS SALESTIMESTAMP_WM
) WITH (
'connector' = 'kafka',
'topic' = 'avro_salescompleted',
'properties.bootstrap.servers' = 'broker:29092',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' =
'http://schema-registry:9081',
'value.fields-include' = 'ALL'
);
```
The same can be configured against MySQL, MongoDB, and other Lakehouse
tables like Iceberg, Paimon. Hudi etc as CDC sources.
What I'm hoping is that the "in memory" bit can be "dropped" and whats
created in the "Generic" catalog is actually persisted inside a catalog store,
Polaris/Rest would be great, JDBC is also a convenient option, but hey, we are
in the Polaris world... ;)
hope this clarifies it.
G
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]