Hi,

Yes, the fact that the operator can see isChainStart() and isChainEnd() is not 
good, in my opinion. These seems to be an implementation detail that an 
operator should not be aware of. For now it’s ok but maybe we can fix that 
later.

Regarding output edges and serialisers, I think it might be necessary to 
differentiate between an operator config that the operator “can see”, this 
would be very minimal, and an operator config that the task uses to setup the 
chain and other stuff. This would store things that are tied to one operator 
within the chain but that the operator itself must not be concerned with. What 
do you think?

Best,
Aljoscha

> On 5. Jul 2017, at 07:39, Xu Pingyong <xupingyong...@163.com> wrote:
> 
> Hi Aljoscha:
> 
> 
>    I sum up my thoughts now.
>    1. rename StreamConfig to StreamTaskConfig.
>    2. OperatorConig can be changed to be serialisable. If StreamTaskConfig is 
> also serialisable, it cannot be deserialized when it is passed to the 
> jobManager, which do not depend on "flink-streaming-java".
>    3. The call getChainIndex() is used only in OperatorConfig.toString(), it 
> can be removed. However, isChainStart() and isChainEnd() is used in 
> AbstractStreamOperator.setup(...).
> 
>    However I am not sure whether to put some properties in StreamTaskConfig 
> or OperatorConfig, for example input serializer is used not only in Operator 
> but also in OpeatorChain. Linkewise output edges and serialisers are only 
> used in OpeatorChain now, but whether the operator can see and use them later?
>    2)  streamOperator
>    4)  output edges and serializers.
> 
>   What do you think?
> 
> 
>    Best Regards!
> Xu Pingyong
> 
> 
> 
> 
> 
> 
> 
> 
> 
> At 2017-07-05 11:02:56, "Xu Pingyong" <xupingyong...@163.com> wrote:
>> Hi Aljoscha:
>> 
>> 
>> Ye, I agree with you that an operator should not see output edges and 
>> serialisers. The call getChainIndex() is used only in 
>> OperatorConfig.toString(), it can be removed. However, isChainStart() and 
>> isChainEnd() is used in AbstractStreamOperator.setup(...).
>> 
>> 
>> But I think what Stephan meant is only that changing OperatorConfig to be 
>> serialisable. If StreamConfig is also serialisable, it need to be serialized 
>> into the Configuration, which is underlying before and flows across modules.
>> 
>> 
>> Do you agree what I understand?
>> 
>> 
>> Best Regards!
>> 
>> 
>> Xu Pingyong
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2017-07-05 00:01:34, "Aljoscha Krettek" <aljos...@apache.org> wrote:
>>> Hi,
>>> 
>>> Yes, but I think what Stephan was hinting at was to change both of them to 
>>> be serialisable when already working on this.
>>> 
>>> I think input serialiser is fine to have in OperatorConfig, you’re right! I 
>>> don’t see getChainIndex() used anywhere in the code, though. And the output 
>>> edges and serialisers also look like they should not be visible to the 
>>> operator.
>>> 
>>> What do you think?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 4. Jul 2017, at 17:52, xu <xupingyong...@163.com> wrote:
>>>> 
>>>> Hi Aljoscha:
>>>>   Thanks a lot for your advice.
>>>> 
>>>> 
>>>>   I think I have not need to separate steps, because what I do is only 
>>>> that introducing OperatorConfig and moving the fields. StreamConfig  still 
>>>>  relys on an underlying Configuration which flows from client to the 
>>>> jobmanager and then to the task.
>>>> 
>>>> 
>>>>   The following configs are used in an operator now:
>>>>   2) input serializer is used in AsyncWaitOperator.class
>>>>   5) chain.index is used in AbstractStreamOperator.setup(...)
>>>> 
>>>> 
>>>>   However, What I put in the OperatorConfig is all configs belong to the 
>>>> operator, contains not only the operator uses now, but also the streamTask 
>>>> uses to build an operator. By OperatorConfig, an operator can not see 
>>>> configs belong to others.
>>>> 
>>>> 
>>>>  Best Regards!
>>>>  JiPing
>>> 

Reply via email to