Hey, Thank you! We could fix it by calling the initializeSerializers() in the invoke method.
Regards, Gyula On Wed, Jul 2, 2014 at 12:20 AM, Ufuk Celebi <[email protected]> wrote: > On 01 Jul 2014, at 16:49, Hermann Gábor <[email protected]> wrote: > > > Hey, > > > > We are trying to move the streaming code from the 0.5 release to 0.6, and > > we've run into a problem. We extended AbstractInputTask, AbstractTask and > > AbstractOutputTask classes to implement our components (StreamSource, > > StreamTask, StreamSink), and it seems like they are replaced by > > DataSourceTask, RegularPactTask and DataSinkTask respectively, so we > > replaced them in our code too. > > The execution engine's task hierarchy has been simplified with > 8c1d82a8ec674de6525319501c6be2674e3143f1 [1]. It does not differentiate > between input and output tasks any more. I think the respective input and > output task logic (e.g for input splits) has moved to the classes you > mentioned, but I'm not sure whether you really should subclass these if you > only need a small subset of their functionality. > > At least for the previous AbstractTask you could just extend > AbstractInvokable, because it was just subclassing AbstractInvokable before. > > I'd say wait for Stephan's take on this. ;-) > > [1] > https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1 > > > > The problem is that the RecordWriter's > > numChannels is not set because the RecordWriter's initializeSerializers() > > method does not get called. Should we call this manually somewhere? > > Yes. This is needed to work around a flaw in the way that the runtime is > instantiated, which is going to be refactored soon. > > The RecordWriter creates the OutputGate, but the RuntimeEnvironment > initializes the channels of the output gates at a later point. That's why > it is not known at construction time how many channels are attached to a > Gate and consequently how many serializers are needed for the RecordWriter. > The initializeSerializers method is a work around for that. > > This could also be refactored stand-alone, but will be subsumed by the > upcoming runtime changes for the intermediate data set partitions > (FLINK-986). If it is important to you, I could also do a quick fix for it > now. > > [2] > https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1 > >
