[ 
https://issues.apache.org/jira/browse/FLINK-17505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17136731#comment-17136731
 ] 

Oleksandr Nitavskyi edited comment on FLINK-17505 at 6/16/20, 3:19 PM:
-----------------------------------------------------------------------

MergeOperartor (ConsolidationOperator) will not be able to replace files 
atomically(at least on HDFS), so some isolation can be violated. For this the 
possible solution would be to produce data to some temp directory (aka [temp 
bucket 
|https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]),
 then the Consolidation operator will merge small files depending on the format 
and move files to the final destination in the background. Unfortunately for S3 
FS, it would require the copying of the data.
Also even in case if the merge will not change any files and simply moves files 
to the final destination we can a useful feature to the FileStreeamingSink 
outputs. Since currently, any consumers of the files produced by Flink should 
filter files without suffixes(which neither .in-progress no .pending). Probably 
we want to move this logic on the Flink side.

*Problems:*
1. When to start file consolidation? Ideally, we want to perform merge 
iteration once the files were renamed from pending. Which is performed once the 
checkpoint is done or upon the recovery. But it is not obvious how reliably 
react to such events in another operator. So we probably want to merge files 
periodically on some timer with some configurable period (probably similar to 
the checkpoint interval).

2. When files should actually be merged? There are at least two cases when 
files should be merged together and moved to the final directory:
    * The desirable size of the input files is achieved.
    * The bucket is closed. E.g. in case of time series export we probably 
should be able to compare time associated with a bucket and current watermark 
(if any). So it should be decided by bucketAssigner and bucketContext.

3. When files should be moved?
    * Once they achieve desired file size (or when they were actually merged by 
achieving the desirable input files size)
    * When the bucket is actually closed. E.g. it is a time series bucket and 
BucketAssigner with a bucket context can suppose that the bucket is closed. 
More detailed thoughts about Meta-Info has been precise in this technical doc 
https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.b8ibdmatt6t3
    * When the bucket is not yet closed (or never will be), but certain 
inactivity time has passed. 

4. How to handle the failures? MergeOperator should perform merging and then 
move merged files to the final directories. Since this operation cannot be made 
atomically and mutates the state on FS we should ensure idempotence of the 
merge/move/source removal operation. For this, we can store some state 
describing the mutation plan of the input files. We can use Flink State for 
this or persist the transaction plan on output FS.

5. How to share files from different slots for merging? We probably want to 
keep the same parallelism as FileStreamingSink. And MergeOperators should 
consider only the files produced by the Sink from the same slot. In this case 
on bucket closing if we want to keep the optimal output size we should make 
another consolidation strategy. So in order to keep efficiency, we want to 
perform merge operations in parallel.

6. How to discover files which should be merged? Such files are known by Bucket 
class. A possible solution is to forward all newly created filenames to the 
MergeOperator. Another solution is simply to list open buckets periodically. In 
case we have high parallelism we risk creating unnecessary load on the 
underlying file system. So for this operation, we would prefer to have a 
parallelism = 1.

7. Should we split files if they are too big? Probably the problem of the big 
files should be addressed by the proper Checkpoint Policy.


was (Author: oleksandr nitavskyi):
MergeOperartor (ConsolidationOperator) will not be able to replace files 
atomically(at least on HDFS), so some isolation can be violated. For this the 
possible solution would be to produce data to some temp directory (aka [temp 
bucket 
|https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.kpgruelafx8h]),
 then the Consolidation operator will merge small files depending on the format 
and move files to the final destination in the background. Unfortunately for S3 
FS, it would require the copying of the data.
Also even in case if the merge will not change any files and simply moves files 
to the final destination we can a useful feature to the FileStreeamingSink 
outputs. Since currently, any consumers of the files produced by Flink should 
filter files without suffixes(which neither .in-progress no .pending). Probably 
we want to move this logic on the Flink side.

Problems:
1. When to start file consolidation? Ideally, we want to perform merge 
iteration once the files were renamed from pending. Which is performed once the 
checkpoint is done or upon the recovery. But it is not obvious how reliably 
react to such events in another operator. So we probably want to merge files 
periodically on some timer with some configurable period (probably similar to 
the checkpoint interval).
2. When files should actually be merged? There are at least two cases when 
files should be merged together and moved to the final directory:
    1. The desirable size of the input files is achieved.
    2. The bucket is closed. E.g. in case of time series export we probably 
should be able to compare time associated with a bucket and current watermark 
(if any). So it should be decided by bucketAssigner and bucketContext.
3. When files should be moved?
    * Once they achieve desired file size (or when they were actually merged by 
achieving the desirable input files size)
    * When the bucket is actually closed. E.g. it is a time series bucket and 
BucketAssigner with a bucket context can suppose that the bucket is closed. 
More detailed thoughts about Meta-Info has been precise in this technical doc 
https://docs.google.com/document/d/1VOT12gPctXew58EyXqIEnNV2N3IMiWFYSL4b41oGQ-c/edit#heading=h.b8ibdmatt6t3
    * When the bucket is not yet closed (or never will be), but certain 
inactivity time has passed. 
4. How to handle the failures? MergeOperator should perform merging and then 
move merged files to the final directories. Since this operation cannot be made 
atomically and mutates the state on FS we should ensure idempotence of the 
merge/move/source removal operation. For this, we can store some state 
describing the mutation plan of the input files. We can use Flink State for 
this or persist the transaction plan on output FS.
5. How to share files from different slots for merging? We probably want to 
keep the same parallelism as FileStreamingSink. And MergeOperators should 
consider only the files produced by the Sink from the same slot. In this case 
on bucket closing if we want to keep the optimal output size we should make 
another consolidation strategy. So in order to keep efficiency, we want to 
perform merge operations in parallel.
6. How to discover files which should be merged? Such files are known by Bucket 
class. A possible solution is to forward all newly created filenames to the 
MergeOperator. Another solution is simply to list open buckets periodically. In 
case we have high parallelism we risk creating unnecessary load on the 
underlying file system. So for this operation, we would prefer to have a 
parallelism = 1.
7. Should we split files if they are too big? Probably the problem of the big 
files should be addressed by the proper Checkpoint Policy.

> Merge small files produced by StreamingFileSink
> -----------------------------------------------
>
>                 Key: FLINK-17505
>                 URL: https://issues.apache.org/jira/browse/FLINK-17505
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.10.0
>            Reporter: Piotr Nowojski
>            Priority: Major
>
> This an alternative approach to FLINK-11499, to solve a problem of creating 
> many small files with bulk formats in StreamingFileSink (which have to be 
> rolled on checkpoint).
> Merge based approach would require converting {{StreamingFileSink}} from a 
> sink, to an operator, that would be working exactly as it’s working right 
> now, with the same limitations (no support for arbitrary rolling policies for 
> bulk formats), followed by another operator that would be tasked with merging 
> small files in the background. 
> In the long term we probably would like to have both merge operator and write 
> ahead log solution (WAL described in FLINK-11499) as alternatives, as WAL 
> would behave better if small files are more common, and merge operator could 
> behave better if small files are rare (because of data skew for example).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to