Samrat002 commented on PR #21458:
URL: https://github.com/apache/flink/pull/21458#issuecomment-1703902505
I have taken an example where a datagen table is created with 2 fields
`fname` and `lname`. Also created another table which is of type filesystem and
points to a specfic s3 path and format used is `csv`.
```
-- create a genertor table
CREATE TABLE generator (
fname STRING,
lname STRING
) WITH (
'connector' = 'datagen'
);
-- create a sample dynamic table with connector filesystem. It supports csv
as format.
CREATE TABLE `name_table` (
`fname` STRING,
`lname` STRING
) with (
'connector'='filesystem',
'format' = 'csv',
'path' = 's3://dbsamrat-flink-dev/data/default/name_table'
);
-- run a job to insert data in table (s3)
insert into name_table select * from generator;
```
Here is the below flink-conf file used for the cluster (also these configs
are picked in job )
Attaching the jobmanager log for insertion of data in csvformated s3 path
which uses CsvBulkWriter and maintains 2 phase commit.
[jobmanager.log](https://github.com/apache/flink/files/12504473/insert_jobmanager.log)
It can be noted that 2 phase commit is happening at checkpoint trigger.
Additional job executed seperately to read data from name_table.
[count_jobmanager.log](https://github.com/apache/flink/files/12504475/count_jobmanager.log)
@dannycranmer @hlteoh37 please review if this satisfy the guarentee for
exactly once .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]