francocalvo opened a new issue, #2267: URL: https://github.com/apache/fluss/issues/2267
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Fluss version 0.8.0 (latest release) ### Please describe the bug 🐞 I'm trying to create a simple PoC that will take data from a PostgreSQL table using Flink CDC into a Fluss table and enable streaming into Paimon. This is all local for now. Paimon uses Garage (S3 API) for the warehouse a PostgreSQL as the catalog. This is my initial job in Flink: <details> <summary> FlinkSQL Job for CDC into Fluss table </summary> ``` -- Run streaming job + checkpointing for EOS SET 'execution.runtime-mode' = 'streaming'; SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; -- 1) Fluss catalog (point to coordinator+tablet; Fluss supports comma-separated bootstrap servers) :contentReference[oaicite:3]{index=3} CREATE CATALOG fluss_catalog WITH ( 'type' = 'fluss', 'bootstrap.servers' = '192.168.1.4:9123,192.168.1.4:9124' ); USE CATALOG fluss_catalog; CREATE DATABASE IF NOT EXISTS osb_staging; USE osb_staging; -- DROP TABLE IF EXISTS tickets_staging; -- 2) Fluss staging table (append-only log table) CREATE TABLE IF NOT EXISTS tickets_staging ( ticket_id bigint, movie_id bigint, user_id bigint, cost DECIMAL(10, 2), purchased_at timestamp(3), PRIMARY KEY (ticket_id) NOT ENFORCED ) WITH ( 'bucket.num' = '4', 'table.datalake.enabled' = 'true', 'table.datalake.freshness' = '30s' ); -- 3) Postgres CDC source (Flink CDC SQL connector) -- The connector options shown here are the documented ones. :contentReference[oaicite:4]{index=4} CREATE TEMPORARY TABLE pg_osb_tickets ( ticket_id BIGINT, movie_id BIGINT, user_id BIGINT, cost DECIMAL(10,2), purchased_at TIMESTAMP(3), PRIMARY KEY (ticket_id) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', 'hostname' = '192.168.1.4', 'port' = '5432', 'username' = 'root', 'password' = 'root', 'database-name' = 'source_db', 'schema-name' = 'osb', 'table-name' = 'tickets', 'slot.name' = 'cdc_osb_tickets_to_fluss', 'decoding.plugin.name' = 'pgoutput', 'scan.incremental.snapshot.enabled' = 'true' ); -- 4) Start the replication stream into Fluss INSERT INTO tickets_staging SELECT ticket_id, movie_id, user_id, cost, purchased_at FROM pg_osb_tickets; ``` </details> Then, I start the tiering service like this: ``` docker exec flink-jobmanager /opt/flink/bin/flink run \\ -Dpipeline.name=\"Fluss Tiering Service\" \\ -Dparallelism.default=2 \\ -Dstate.checkpoints.dir=\"s3://warehouse/checkpoints/tiering\" \\ -Ds3.multiobjectdelete.enable=false \\ /opt/flink/lib/fluss-flink-tiering-0.8.0-incubating.jar \\ --fluss.bootstrap.servers 192.168.1.4:9123 \\ --datalake.format paimon \\ --datalake.paimon.metastore jdbc \\ --datalake.paimon.uri \"jdbc:postgresql://192.168.1.4:5433/paimon_catalog\" \\ --datalake.paimon.jdbc.user root \\ --datalake.paimon.jdbc.password root \\ --datalake.paimon.catalog-key paimon_catalog \\ --datalake.paimon.warehouse \"s3://warehouse/paimon\" \\ --datalake.paimon.s3.endpoint \"http://192.168.1.4:3900\" \\ --datalake.paimon.s3.access-key \"${GARAGE_ACCESS_KEY}\" \\ --datalake.paimon.s3.secret-key \"${GARAGE_SECRET_KEY}\" \\ --datalake.paimon.s3.path.style.access true ``` So far, it _almost_ works. I can see data real time, and when I go the bucket, I see that there are three types of directories for the table: bucket directories (3 or 4), one schema directory and one manifest directory. **But no snapshots!** Because of this, it doesn't matter how much I wait, I can't get data from a `SELECT * FROM tickets_staging$lake`. ### Solution _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
