Hi Till, Since it has been a little bit while, I would like to restart this discussion.
Would you please share some ideas about this? Will it become a stability problem if we create a "surefire" listener to prevent the exception thrown from those custom listeners? [1] By the way, as you have mentioned that the outcome should be a design along with the POC, shall we create a FLIP for this where I can put the design about the change. Or maybe we should continue the discussion to reach a consensus before a FLIP? [1] https://github.com/predatorray/flink/commit/2cab8bb1119162213632db984d2eb7529b8140e7#diff-4815dccc0cbf42d48f1668d4a076d19ec96c196bc562c3d2edae4d7bf9b9bd89R72 Thanks, Wenhao On Sat, Apr 10, 2021 at 9:32 PM Wenhao Ji <predator....@gmail.com> wrote: > > Hi Till, > > Thanks for taking time out of your busy schedule. > I have created a POC for this feature. > > The code change for the Flink source code will be like this commit: > https://github.com/predatorray/flink/commit/2cab8bb1119162213632db984d2eb7529b8140e7 > Generally, the idea is that custom `JobStatusListener`s will be loaded via > their `JobStatusListenerFactories` using the `PluginManager`. They will be > created and initialized during the construction of the `ClusterEntrypoint`. > `JobMaster` will register these listeners when it starts scheduling. > > Finally, we can implement our own plugins. I have also written an example of > a JobStatusListener plugin, which simply prints the job status changes: > https://github.com/predatorray/flink-example-listener-plugin > > Hope you will have time to review the code and idea. > > Thanks again! > > Wenhao > > On Wed, Mar 10, 2021 at 11:23 PM Till Rohrmann <trohrm...@apache.org> wrote: >> >> Hi Wenhao, >> >> Aljoscha might not be as responsive as before. Surely you can create a POC >> to evaluate different approaches. But the outcome should be a design which >> we discuss before starting to implement the code properly. At the moment >> the community might be a bit busy with the upcoming feature freeze (just >> for your information). >> >> Cheers, >> Till >> >> On Wed, Mar 10, 2021 at 3:40 PM Wenhao Ji <predator....@gmail.com> wrote: >> >> > Hi Till. Indeed, there is no proper solution now other than the polling >> > method. It is painful to have such code in our platform since it consumes a >> > lot of resources to keep the polling run periodically when there are >> > hundreds of Flink clusters to maintain. A lot of pollings are actually >> > useless as the job status seldom changes. Also, it makes the status not >> > synchronized in-time. >> > >> > Aljoscha, can I submit a PR firstly so that we can review and discuss >> > whether it will introduce any stability problem or potential risks. >> > >> > Thanks, >> > Wenhao >> > >> > On Sat, Jan 9, 2021 at 12:31 AM Jeff Zhang <zjf...@gmail.com> wrote: >> > >> > > Hi Till, >> > > >> > > IIUC for application mode, we already allow to run user code in job >> > manager >> > > >> > > Till Rohrmann <trohrm...@apache.org> 于2021年1月8日周五 下午9:53写道: >> > > >> > > > At the moment, this requirement has not come up very often. In >> > general, I >> > > > am always a bit cautious when adding functionality which executes user >> > > code >> > > > in the JobManager because it can easily become a stability problem. On >> > > the >> > > > other hand, I can't think of a different solution other than polling >> > the >> > > > job status atm. >> > > > >> > > > Cheers, >> > > > Till >> > > > >> > > > On Fri, Jan 8, 2021 at 1:23 PM Aljoscha Krettek <aljos...@apache.org> >> > > > wrote: >> > > > >> > > > > Till or Chesnay (cc'ed), have you thought about adding a hook on the >> > > > > JobMaster/JobManager to allow external systems to get push >> > > notifications >> > > > > about submitted jobs. >> > > > > >> > > > > If they are ok with such as future, would you maybe be interested in >> > > > > implementing it yourself, Wenhao? >> > > > > >> > > > > Best, >> > > > > Aljoscha >> > > > > >> > > > > On 2020/09/28 11:14, 季文昊 wrote: >> > > > > >Hi Aljoscha, >> > > > > > >> > > > > >Yes, that is not enough, since the `JobListener`s are called only >> > once >> > > > > when >> > > > > >`excute()` or `executeAsync()` is called. And in order to sync the >> > > > status, >> > > > > >I also have to call `JobClient#getJobStatus` periodically. >> > > > > > >> > > > > >On Fri, Sep 25, 2020 at 8:12 PM Aljoscha Krettek < >> > aljos...@apache.org >> > > > >> > > > > >wrote: >> > > > > > >> > > > > >> Hi, >> > > > > >> >> > > > > >> I understand from your email that >> > > > > >> `StreamExecutionEnvironment.registerJobListener()` would not be >> > > > enought >> > > > > >> for you because you want to be notified of changes on the cluster >> > > > side, >> > > > > >> correct? That is when the job status changes on the master. >> > > > > >> >> > > > > >> Best, >> > > > > >> Aljoscha >> > > > > >> >> > > > > >> On 23.09.20 14:31, 季文昊 wrote: >> > > > > >> > Hi there, >> > > > > >> > >> > > > > >> > I'm working on a Flink platform in my corp, which provides a >> > > service >> > > > > to >> > > > > >> > provision and manage multiple dedicated Flink clusters. The >> > > problem >> > > > is >> > > > > >> that >> > > > > >> > we want to sync a job status without delay after its submission >> > > > > through >> > > > > >> our >> > > > > >> > platform as long as it has been changed. >> > > > > >> > >> > > > > >> > Since we want to update this in-time and make our services >> > > > stateless, >> > > > > >> > pulling a job's status periodically is not a good solution. I do >> > > not >> > > > > find >> > > > > >> > any proper way to achieve this by letting a job manager push >> > > changes >> > > > > >> > directly to our platform except changing the source code, which >> > > > > registers >> > > > > >> > an additional `JobStatusListener` in the method >> > > > > >> > `org.apache.flink.runtime.jobmaster.JobMaster#startScheduling`. >> > > > > >> > >> > > > > >> > I wonder if we can enhance `JobStatusListener` a little bit so >> > > that >> > > > a >> > > > > >> Flink >> > > > > >> > user can register his custom JobStatusListener at the startup. >> > > > > >> > >> > > > > >> > To be specific, we can have a `JobStatusListenerFactory` >> > interface >> > > > and >> > > > > >> its >> > > > > >> > corresponding `ServiceLoader<JobStatusListenerFactory>`, where >> > > > > >> > the JobStatusListenerFactory will have the following method: >> > > > > >> > - JobStatusListener createJobStatusListener(Properties >> > > > properties); >> > > > > >> > >> > > > > >> > Custom listeners will be created during the >> > > > JobMaster#startScheduling >> > > > > >> > method. >> > > > > >> > >> > > > > >> > If someone would like to implement his own JobStatusListener, he >> > > > will >> > > > > >> > package all the related classes into a standalone jar with a >> > > > > >> > >> > > > > >> >> > > > > >> > > > >> > > >> > `META-INF/services/org.apache.flink.runtime.executiongraph.JobStatusListener` >> > > > > >> > file and place it under the `lib/` directory. >> > > > > >> > >> > > > > >> > In addition, I find that there is a Jira ticket similar to what >> > > I'm >> > > > > >> > asking: FLINK-17104 but I do not see any comment or update yet. >> > > Hope >> > > > > >> anyone >> > > > > >> > could help me move on this feature or give me some suggestions >> > > about >> > > > > it. >> > > > > >> > >> > > > > >> > Thanks, >> > > > > >> > Wenhao >> > > > > >> > >> > > > > >> >> > > > > >> >> > > > > >> > > > >> > > >> > > >> > > -- >> > > Best Regards >> > > >> > > Jeff Zhang >> > > >> >