On 01 Jul 2014, at 16:49, Hermann Gábor <[email protected]> wrote:
> Hey, > > We are trying to move the streaming code from the 0.5 release to 0.6, and > we've run into a problem. We extended AbstractInputTask, AbstractTask and > AbstractOutputTask classes to implement our components (StreamSource, > StreamTask, StreamSink), and it seems like they are replaced by > DataSourceTask, RegularPactTask and DataSinkTask respectively, so we > replaced them in our code too. The execution engine's task hierarchy has been simplified with 8c1d82a8ec674de6525319501c6be2674e3143f1 [1]. It does not differentiate between input and output tasks any more. I think the respective input and output task logic (e.g for input splits) has moved to the classes you mentioned, but I'm not sure whether you really should subclass these if you only need a small subset of their functionality. At least for the previous AbstractTask you could just extend AbstractInvokable, because it was just subclassing AbstractInvokable before. I'd say wait for Stephan's take on this. ;-) [1] https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1 > The problem is that the RecordWriter's > numChannels is not set because the RecordWriter's initializeSerializers() > method does not get called. Should we call this manually somewhere? Yes. This is needed to work around a flaw in the way that the runtime is instantiated, which is going to be refactored soon. The RecordWriter creates the OutputGate, but the RuntimeEnvironment initializes the channels of the output gates at a later point. That's why it is not known at construction time how many channels are attached to a Gate and consequently how many serializers are needed for the RecordWriter. The initializeSerializers method is a work around for that. This could also be refactored stand-alone, but will be subsumed by the upcoming runtime changes for the intermediate data set partitions (FLINK-986). If it is important to you, I could also do a quick fix for it now. [2] https://github.com/apache/incubator-flink/commit/8c1d82a8ec674de6525319501c6be2674e3143f1
