francocalvo opened a new issue, #2267:
URL: https://github.com/apache/fluss/issues/2267

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   0.8.0 (latest release)
   
   ### Please describe the bug 🐞
   
   I'm trying to create a simple PoC that will take data from a PostgreSQL 
table using Flink CDC into a Fluss table and enable streaming into Paimon. This 
is all local for now.
   
   Paimon uses Garage (S3 API) for the warehouse a PostgreSQL as the catalog.
   This is my initial job in Flink:
   
   
   <details>
   
   <summary> FlinkSQL Job for CDC into Fluss table </summary>
   
   ```
   -- Run streaming job + checkpointing for EOS
   SET 'execution.runtime-mode' = 'streaming';
   SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
   
   -- 1) Fluss catalog (point to coordinator+tablet; Fluss supports 
comma-separated bootstrap servers) :contentReference[oaicite:3]{index=3}
   CREATE CATALOG fluss_catalog WITH (
       'type' = 'fluss',
       'bootstrap.servers' = '192.168.1.4:9123,192.168.1.4:9124'
   );
   USE CATALOG fluss_catalog;
   
   CREATE DATABASE IF NOT EXISTS osb_staging;
   USE osb_staging;
   
   -- DROP TABLE IF EXISTS tickets_staging;
   
   -- 2) Fluss staging table (append-only log table)
   CREATE TABLE IF NOT EXISTS tickets_staging (
       ticket_id bigint,
       movie_id bigint,
       user_id bigint,
       cost DECIMAL(10, 2),
       purchased_at timestamp(3),
       PRIMARY KEY (ticket_id) NOT ENFORCED
   )
   WITH (
       'bucket.num' = '4',
       'table.datalake.enabled' = 'true',
       'table.datalake.freshness' = '30s'
   );
   
   
   -- 3) Postgres CDC source (Flink CDC SQL connector)
   -- The connector options shown here are the documented ones. 
:contentReference[oaicite:4]{index=4}
   
   CREATE TEMPORARY TABLE pg_osb_tickets (
     ticket_id BIGINT,
     movie_id BIGINT,
     user_id BIGINT,
     cost DECIMAL(10,2),
     purchased_at TIMESTAMP(3),
     PRIMARY KEY (ticket_id) NOT ENFORCED
   ) WITH (
     'connector' = 'postgres-cdc',
     'hostname' = '192.168.1.4',
     'port' = '5432',
     'username' = 'root',
     'password' = 'root',
     'database-name' = 'source_db',
     'schema-name' = 'osb',
     'table-name' = 'tickets',
     'slot.name' = 'cdc_osb_tickets_to_fluss',
     'decoding.plugin.name' = 'pgoutput',
     'scan.incremental.snapshot.enabled' = 'true'
   );
   
   
   -- 4) Start the replication stream into Fluss
   INSERT INTO tickets_staging
   SELECT
       ticket_id,
       movie_id,
       user_id,
       cost,
       purchased_at
   FROM
       pg_osb_tickets;
   ```
   
   </details>
   
   
   Then, I start the tiering service like this:
   
   ```
           docker exec flink-jobmanager /opt/flink/bin/flink run \\
             -Dpipeline.name=\"Fluss Tiering Service\" \\
             -Dparallelism.default=2 \\
             -Dstate.checkpoints.dir=\"s3://warehouse/checkpoints/tiering\" \\
             -Ds3.multiobjectdelete.enable=false \\
             /opt/flink/lib/fluss-flink-tiering-0.8.0-incubating.jar \\
             --fluss.bootstrap.servers 192.168.1.4:9123 \\
             --datalake.format paimon \\
             --datalake.paimon.metastore jdbc \\
             --datalake.paimon.uri 
\"jdbc:postgresql://192.168.1.4:5433/paimon_catalog\" \\
             --datalake.paimon.jdbc.user root \\
             --datalake.paimon.jdbc.password root \\
             --datalake.paimon.catalog-key paimon_catalog \\
             --datalake.paimon.warehouse \"s3://warehouse/paimon\" \\
             --datalake.paimon.s3.endpoint \"http://192.168.1.4:3900\"; \\
             --datalake.paimon.s3.access-key \"${GARAGE_ACCESS_KEY}\" \\
             --datalake.paimon.s3.secret-key \"${GARAGE_SECRET_KEY}\" \\
             --datalake.paimon.s3.path.style.access true
   ```
   
   So far, it _almost_ works. I can see data real time, and when I go the 
bucket, I see that there are three types of directories for the table: bucket 
directories (3 or 4), one schema directory and one manifest directory. **But no 
snapshots!** Because of this, it doesn't matter how much I wait, I can't get 
data from a `SELECT * FROM tickets_staging$lake`. 
   
   ### Solution
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to