[
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-9592:
----------------------------------
Labels: auto-deprioritized-major auto-unassigned pull-request-available
stale-minor (was: auto-deprioritized-major auto-unassigned
pull-request-available)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Notify on moving file into pending/ final state
> -----------------------------------------------
>
> Key: FLINK-9592
> URL: https://issues.apache.org/jira/browse/FLINK-9592
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / FileSystem
> Reporter: Rinat Sharipov
> Priority: Minor
> Labels: auto-deprioritized-major, auto-unassigned,
> pull-request-available, stale-minor
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>
> During implementation of one of our tasks we got the following need - create
> a meta-file, with the path and additional information about the file, created
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us.
>
> We’ve implemented our own Sink, that provides an opportunity to register
> notifiers, that will be called, when file state is changing, but current API
> doesn’t allow us to add such behaviour using inheritance ...
>
> It seems, that such functionality could be useful, and could be a part of
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>
> email: [[email protected]|mailto:[email protected]]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>
> ------------------------------------------------------------------------------------------------------------------------
>
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from
> inheriting from BucketingSink? Maybe it would be just enough to make the
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now
> working on larger BucketingSink rework/refactor.
> Piotrek
> ________________________________________________________________________
>
> Hi guys, thx for your reply.
> The following code info is actual for *release-1.5.0 tag,
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>
> For now, BucketingSink has the following lifecycle of files
>
> When moving files from opened to pending state:
> # on each item (*method* *invoke:434* *line*), we check that suitable bucket
> exist, and contain opened file, in case, when opened file doesn’t exist, we
> create one, and write item to it
> # on each item (*method* *invoke:434* *line*), we check that suitable opened
> file doesn’t exceed the limits, and if limits are exceeded, we close it and
> move into pending state using *closeCurrentPartFile:568 line - private method*
> # on each timer request (*onProcessingTime:482 line*), we check, if items
> haven't been added to the opened file longer, than specified period of time,
> we close it, using the same private method *closeCurrentPartFile:588 line*
>
> So, the only way, that we have, is to call our hook from
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl
> and injected our logic there
>
>
> Files are moving from pending state into final, during checkpointing
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and
> contains a lot of logic, including discovery of files in pending states,
> synchronization of state access and it’s modification, etc …
>
> So we couldn’t override it, or call super method and add some logic, because
> when current impl changes the state of files, it removes them from state, and
> we don’t have any opportunity to know,
> for which files state have been changed.
>
> To solve such problem, we've created the following interface
>
> /**
> * The \{@code FileStateChangeCallback}is used to perform any additional
> operations, when
> {@link BucketingSink}
> * moves file from one state to another. For more information about state
> management of \{@code BucketingSink}, look
> * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used
> to perform any additional operations, related with moving of file into next
> state. * * @param fs provides access for working with file system * @param
> path path to the file, moved into next state * * @throws IOException if
> something went wrong, while performing any operations with file system */
> void call(FileSystem fs, Path path) throws IOException; }
> And have added an ability to register this callbacks in BucketingSink impl in
> the following manner
>
> public BucketingSink<T>
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
> public BucketingSink<T>
> registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks)
> \{...}
>
> I’m ready to discuss the best ways, how such hooks could be implemented in
> the core impl or any other improvements, that will help us to add such
> functionality into our extension, using public api, instead of copy-pasting
> the source code.
>
> Thx for your help, mates =)
> [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0]
>
> Sincerely yours,
> *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>
> email: [[email protected]|mailto:[email protected]]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>
> ________________________________________________________________________
>
> Hi,
>
> Couple of things:
>
> 1. Please create a Jira ticket with this proposal, so we can move discussion
> from user mailing list.
>
> I haven’t thought it through, so take my comments with a grain of salt,
> however:
>
> 2. If we were to go with such callback, I would prefer to have one
> BucketStateChangeCallback, with methods like `onInProgressToPending(…)`,
> `onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one
> interface passed three times/four times for different purposes.
>
> 3. Other thing that I had in mind is that BucketingSink could be rewritten to
> extend TwoPhaseCommitSinkFunction. In that case, with
>
> public class BucketingSink2 extends TwoPhaseCommitSinkFunction<???>
>
> user could add his own hooks by overriding following methods
>
> BucketingSink2#beginTransaction, BucketingSink2#preCommit,
> BucketingSink2#commit, BucketingSink2#abort. For example:
>
> public class MyBucketingSink extends BucketingSink2 {
> @Override
> protected void commit(??? txn)
> { super.commit(txn); // My hook on moving file from pending to commit
> state }
> ;
> }
>
> Alternatively, we could implement before mentioned callbacks support in
> TwoPhaseCommitSinkFunction and provide such feature to
> Kafka/Pravega/BucketingSink at once.
>
> Piotrek
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)