[
https://issues.apache.org/jira/browse/FLINK-17012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091539#comment-17091539
]
Piotr Nowojski edited comment on FLINK-17012 at 4/24/20, 12:56 PM:
-------------------------------------------------------------------
I've looked at the code and maybe providing {{AbstractInvokableFactory}}
wouldn't be that complicated? Things that would need to be touched:
* introduce {{AbstractInvokableFactory}} interface, with a single
{{AbstractInvokable create(Environment);}} method
* add support for {{AbstractInvokableFactory}} in the {{StreamGraph}}. For this
probably the best way would be to completely replace {{AbstractInvokable}} with
{{AbstractInvokableFactory}} in the {{StreamGraph}} and for all of the existing
invokables that we do not want to migrate, we can provide a trivial
{{SimpleAbstractInvokableFactory}} that in the {{create(Environment)}} method,
would just instantiate the constructor of desired {{AbstractInvokable}}? I
mean, just as the backward compatibility layer, so that we do not have to
handle both {{AbstractInvokableFactory}} and {{AbstractInvokable}} at the same
time in the {{StreamNode}} or {{Task}} classes.
* use {{AbstractInvokableFactory}} in {{Task#loadAndInstantiateInvokable}}
* provide proper factories for all {{StreamTask}} subclasses (the biggest
effort?) and use them in the {{StreamGraph}}
* adjust existing tests
? I guess it would still end up 1000+ lines of code, but most of it shouldn't
be complicated (except of providing proper factories that would move state
backend initialisation from {{#invoke()}} method to the factory
{{#create(...)}} call.
was (Author: pnowojski):
I've looked at the code and maybe providing {{AbstractInvokableFactory}}
wouldn't be that complicated? Things that would need to be touched:
* introduce {{AbstractInvokableFactory}} interface, with a single
{{AbstractInvokable create(Environment);}} method
* add support for {{AbstractInvokableFactory}} in the {{StreamGraph}}. For this
probably the best way would be to completely replace {{AbstractInvokable}} with
{{AbstractInvokableFactory}} in the {{StreamGraph}} and for all of the existing
invokables that we do not want to migrate, we can provide a trivial
{{SimpleAbstractInvokableFactory}} that in the {{create(Environment)}} method,
would just instantiate the constructor of desired {{AbstractInvokable}}? I
mean, just as the backward compatibility layer, so that we do not have to
handle both {{AbstractInvokableFactory}} and {{AbstractInvokable}} at the same
time in the {{StreamNode}} or {{Task}} classes.
* use {{AbstractInvokableFactory}} in {{Task#loadAndInstantiateInvokable}}
* provide proper factories for all {{StreamTask}} subclasses (the biggest
effort?) and use them in the {{StreamGraph}}
* adjust existing tests
?
> Expose stage of task initialization
> -----------------------------------
>
> Key: FLINK-17012
> URL: https://issues.apache.org/jira/browse/FLINK-17012
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Metrics, Runtime / Task
> Reporter: Wenlong Lyu
> Priority: Major
>
> Currently a task switches to running before fully initialized, does not take
> state initialization and operator initialization(#open ) in to account, which
> may take long time to finish. As a result, there would be a weird phenomenon
> that all tasks are running but throughput is 0.
> I think it could be good if we can expose the initialization stage of tasks.
> What to you think?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)