[
https://issues.apache.org/jira/browse/FLINK-19706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216601#comment-17216601
]
Lsw_aka_laplace edited comment on FLINK-19706 at 10/19/20, 10:19 AM:
---------------------------------------------------------------------
[~lzljs3620320]
hi Jingsong
> We can not store all partitions, and we can not a cache only too.
actually appraoch 1 does not have this problem, since it queries and checks
the partition so there is no need to store all the partition which has been
committed, I also prefer 1.(plz see one of the implementations that I just
posted on)
> What about change the way to late event processing?
Coincidently, I had the same concern and did some experiments, unfortunately I
failed. The reasons are the followings.
# the fileStreamingFileWriter related logic is complex( at least for me).
Actually it cost a lot of time for me to understand the process of
StreamFileWriting, especially writing to the bucket, rolling, committing.
Certainly, it is harder to modify or insert users' logic into this process.(If
a user knew few about this, mistake will be easily made )
# How to define the lateness of data? How to judge if this element is late or
not? Actually FileWriter can even know nothing about partition existence(Only
committer can do this). So it can not ensure 【the partition which current
element is affiliated to】 has been commit or not. What shall we do if the data
is late according to watermark but the partition is not committed(which means
this data should not be A LATE DATA). What‘s more, it is hard to set a proper
lateness period. If it is too small, too many element will be regarded as LATE
DATA, both low performance and fatal logic error can occured. If it is too big,
this measure may not solve lateness/repeated commit correctly.
So, as far as I am concerned,late event processing shall be the eventual
solution to this problem. But it’s implementation can not be easily
accomplished without lots of efforts on it. And even through finally it has
been implemented, still it is hard to understand and use(from my
perspective).my solution may can not directly solve or fix the problem, but it
is efficient to find the problem out and it is esay to implement. Maybe we can
just use my solution and 【late event processing】will be a long term and
eventual solution
was (Author: neighborhood):
> We can not store all partitions, and we can not a cache only too.
actually appraoch 1 does not have this problem, since it queries and checks
the partition so there is no need to store all the partition which has been
committed, I also prefer 1.(plz see one of the implementations that I just
posted on)
> What about change the way to late event processing?
Coincidently, I had the same concern and did some experiments, unfortunately I
failed. The reasons are the followings.
# the fileStreamingFileWriter related logic is complex( at least for me).
Actually it cost a lot of time for me to understand the process of
StreamFileWriting, especially writing to the bucket, rolling, committing.
Certainly, it is harder to modify or insert users' logic into this process.(If
a user knew few about this, mistake will be easily made )
# How to define the lateness of data? How to judge if this element is late or
not? Actually FileWriter can even know nothing about partition existence(Only
committer can do this). So it can not ensure 【the partition which current
element is affiliated to】 has been commit or not. If the data is late
according to watermark but the partition is not committed(which means this data
should not be A LATE DATA). What‘s more, it is hard to set a proper lateness
period. If it is too small, too many element will be regarded as LATE DATA,
both low performance and fatal logic error can occured. If it is too big, this
measure may not solve lateness/repeated commit correctly.
So, as far as I am concerned,late event processing shall be the eventual
solution to this problem. But it’s implementation can not be easily
accomplished without lots of efforts on it. And even through finally it has
been implemented, still it is hard to understand and use(from my
perspective).my solution may can not directly solve or fix the problem, but it
is efficient to find the problem out and it is esay to implement. Maybe we can
just use my solution and 【late event processing】will be a long term and
eventual solution
> Introduce `Repeated Partition Commit Check` in
> `org.apache.flink.table.filesystem.PartitionCommitPolicy`
> ---------------------------------------------------------------------------------------------------------
>
> Key: FLINK-19706
> URL: https://issues.apache.org/jira/browse/FLINK-19706
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem, Connectors / Hive, Table SQL /
> Runtime
> Reporter: Lsw_aka_laplace
> Priority: Minor
> Attachments: image-2020-10-19-16-47-39-354.png,
> image-2020-10-19-16-57-02-661.png, image-2020-10-19-17-00-27-255.png,
> image-2020-10-19-17-03-21-558.png, image-2020-10-19-18-16-35-083.png
>
>
> Hi all,
> Recently we have been devoted to using Hive Streaming Writing to
> accelerate our data-sync of Data Warehouse based on Hive, and eventually we
> made it.
> For producing purpose, a lot of metrics/logs/measures were added in
> order to help us analyze running info or fix some unexpected problems. Among
> these mentioned above, we found that Checking Repeated Partition Commit is
> the most important one. So here, we are willing to make a contribution of
> introducing this backwards to Community.
> If this proposal is meaning, I am happy to introduce my design and
> implementation.
>
> Looking forward to ANY opinion~
>
>
> ----UPDATE ----
>
> Our user(using our own platform to build his own Flink job)raised some
> Requests. One of the requests is that once the parition is commited, the data
> in this partitio is regarded as frozen or completed. [Commiting partition]
> seem like a gurantee(but we all know it is hard to be a promise) in some way
> which tells us this partition is completed. Certainly, we make a lot of
> measures try to achieve that [partition-commit means completed]. So if a
> partition is committed twice or more times, for us, there must be sth wrong
> or our measures are insufficent. On the other hand, it also inform us to do
> sth to make up to avoid data-loss or data-incompletion.
>
> So first of all, it is important to let us or help us know that certain
> partition is committed repeatedly. So that we can do the following things ASAP
> 1. analyze the reason or the cause
> 2. do some trade-off operations
> 3. improve our code/measures
>
> — Design and Implementation---
> There are basically two ways, both of them have been used in prod-env
> 1. Add measures in CommitPolicy and be called before partition commit
> !image-2020-10-19-16-47-39-354.png|width=576,height=235!
> //{color:#ffab00}Newly posted, see here{color}
> !image-2020-10-19-18-16-35-083.png|width=725,height=313!
> 1.1 As the pic shows, add `checkPartitionExists` and implement it in
> sub-class
> !image-2020-10-19-17-03-21-558.png|width=1203,height=88!
> 1.2 call checkPartitionExists before partition commit
>
> 2. Build a bounded cache of committed partitions and check it everytime
> before partition commit
> (actually this cache supposed to be a operator state)
> !image-2020-10-19-16-57-02-661.png|width=1298,height=57!
> 2.1 build a cache
> !image-2020-10-19-17-00-27-255.png|width=1235,height=116!
> 2.2 check before commit
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)