[ 
https://issues.apache.org/jira/browse/FLINK-35521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852866#comment-17852866
 ] 

EMERSON WANG edited comment on FLINK-35521 at 6/6/24 4:52 PM:
--------------------------------------------------------------

@luoyuxia:
 
We output data using 1 minute window to the s3 folders. For example, the 
parquet files for ts_min=00 & ts_min=01
are as follows:

ts_min=00:
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/_SUCCESS
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-0-36608
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-0-36609
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-1-36633
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-1-36634

ts_min=01
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/_SUCCESS
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-0-36610
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-0-36611
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-1-36635
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-1-36636

The above _SUCCESS file in the above two folders were generated multiple times, 
respectively.
Because _SUCCESS file was used to trigger the downstream AWS lambda to process 
the parquet files in each folder and thus 
the lambda processed the some parquet files multiple times in each folder.
We don't want to use other resources to track which parquet files have been 
processed or not each time when the lambda is triggered by the _SUCCESS file.
Flink should support another use case, i.e., generating only one success file 
for each window after all the partitions are committed and all output files are 
ready for the downstream application, i.e., triggering the downstream 
application only once.


was (Author: JIRAUSER305679):
@luoyuxia:
 
We output data using 1 minute window to the s3 folders. For example, the 
parquet files for ts_min=00 & ts_min=01
are as follows:

ts_min=00:
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/_SUCCESS
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-0-36608
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-0-36609
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-1-36633
s3://.../dt=2024-06-06/ts_hr=16/ts_min=00/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-1-36634

ts_min=01
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/_SUCCESS
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-0-36610
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-0-36611
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-1-36635
s3://.../dt=2024-06-06/ts_hr=16/ts_min=01/part-c6f5badd-6a38-45c1-8351-06d31605bc1d-1-36636

The above _SUCCESS file in the above two folders were generated multiple times, 
respectively.
Because _SUCCESS file was used to trigger the downstream AWS lambda to process 
the parquet files in each folder and thus 
the lambda processed the some parquet files multiple times in each folder.
We don't want to use other resources to track which parquet files have been 
processed or not each time when the lambda is triggered by the _SUCCESS file.
Flink should support another use case, i.e., generating only one success file 
for each window after all the partitions are committed and all output files are 
ready for the downstream application, i.e., triggering  the downstream 
application only once.

> Flink FileSystem SQL Connector Generating SUCCESS File Multiple Times
> ---------------------------------------------------------------------
>
>                 Key: FLINK-35521
>                 URL: https://issues.apache.org/jira/browse/FLINK-35521
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.18.1
>         Environment: Our PyFlink SQL jobs are running in AWS EKS environment.
>            Reporter: EMERSON WANG
>            Priority: Major
>
> Our Flink table SQL job received data from the Kafka streams and then sinked 
> all partitioned data into the associated parquet files under the same S3 
> folder through the filesystem SQL connector.
> For the S3 filesystem SQL connector, sink.partition-commit.policy.kind was 
> set to 'success-file' and sink.partition-commit.trigger was set to 
> 'partition-time'. We found that _SUCCESS file in the S3 folder was generated 
> multiple times after multiple partitions are committed.
> Because all partitioned parquet files and _SUCCESS file are in the same S3 
> folder and _SUCCESS file is used to trigger the downstream application, we 
> really like the _SUCCESS file to be generated only once instead of multiple 
> times after all partitions are committed and all parquet files are ready to 
> be processed. Thus, one _SUCCESS file can be used to trigger the downstream 
> application only once instead of multiple times.
> We knew we could set sink.partition-commit.trigger to 'process-time' to 
> generate _SUCCESS file only once in the S3 folder; however, 'process-time' 
> would not meet our business requirements.
> We'd request the FileSystem SQL connector should support to the following new 
> user case:
> Even if sink.partition-commit.trigger is set to 'partition-time', _SUCCESS 
> file will be generated only once after all partitions are committed and all 
> output files are ready to be processed, and will be used to trigger the 
> downstream application only once instead of multiple times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to