Hey,

Thank you!
We could fix it by calling the  initializeSerializers() in the invoke
method.

Regards,
Gyula






On Wed, Jul 2, 2014 at 12:20 AM, Ufuk Celebi <[email protected]> wrote:

> On 01 Jul 2014, at 16:49, Hermann Gábor <[email protected]> wrote:
>
> > Hey,
> >
> > We are trying to move the streaming code from the 0.5 release to 0.6, and
> > we've run into a problem. We extended AbstractInputTask, AbstractTask and
> > AbstractOutputTask classes to implement our components (StreamSource,
> > StreamTask, StreamSink), and it seems like they are replaced by
> > DataSourceTask, RegularPactTask and DataSinkTask respectively, so we
> > replaced them in our code too.
>
> The execution engine's task hierarchy has been simplified with
> 8c1d82a8ec674de6525319501c6be2674e3143f1 [1]. It does not differentiate
> between input and output tasks any more. I think the respective input and
> output task logic (e.g for input splits) has moved to the classes you
> mentioned, but I'm not sure whether you really should subclass these if you
> only need a small subset of their functionality.
>
> At least for the previous AbstractTask you could just extend
> AbstractInvokable, because it was just subclassing AbstractInvokable before.
>
> I'd say wait for Stephan's take on this. ;-)
>
> [1]
> https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1
>
>
> > The problem is that the RecordWriter's
> > numChannels is not set because the RecordWriter's initializeSerializers()
> > method does not get called. Should we call this manually somewhere?
>
> Yes. This is needed to work around a flaw in the way that the runtime is
> instantiated, which is going to be refactored soon.
>
> The RecordWriter creates the OutputGate, but the RuntimeEnvironment
> initializes the channels of the output gates at a later point. That's why
> it is not known at construction time how many channels are attached to a
> Gate and consequently how many serializers are needed for the RecordWriter.
> The initializeSerializers method is a work around for that.
>
> This could also be refactored stand-alone, but will be subsumed by the
> upcoming runtime changes for the intermediate data set partitions
> (FLINK-986). If it is important to you, I could also do a quick fix for it
> now.
>
> [2]
> https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1
>
>

Reply via email to