https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-reader-subclass is out of date and at the top says IMPORTANT: Use Splittable DoFn to develop your new I/O. For more details, read the new I/O connector overview.
On Fri, Sep 3, 2021 at 9:55 AM Alexey Romanenko <[email protected]> wrote: > Hi Marco, > > I tried to answer your questions and I also CC’ed Boyuan Zhang as initial > author of SDF-based Read implementation for KafkaIO. > > Also, I’d recommend to take a look on related PR’s discussion [1] which > perhaps can give more details of some internal decisions. > > Please, see my answers inline. > > On 1 Sep 2021, at 18:13, Marco Robles <[email protected]> wrote: > > > I am taking KafkaIO as an example for the PulsarIO connector, during the > development of the new IO, I got some questions on KafkaIO implementation. > I was wondering if anyone has some experience with KafkaIO SDF > implementation that might help me. > > - What was taken into consideration to implement the KafkaSourceDescriptor > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescriptor.java> > which is used as input for the SDF in Kafka? > > > IIRC, this class represents a Kafka topic partition that is used after > in ReadFromKafkaDoFn to actually read data. So, we can have a > PCollection<KafkaSourceDescriptor> to read them in parallel. > > - In the ReadFromKafkaDoFn > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java> > class, you have to implement a getSize in order to estimate how much work > it will take. What approach do you take in order to get an estimate with an > unbounded approach like kafka? > > > It should be quite tricky to do with unbounded sources, so we try to > estimate the size by the number of records for current offset in topic > partition and average record size, based on collected statistics (if any). > > - For the SDF implementation, I suppose it will need a Source Interface > implementation > <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-source-interface> > and > a Reader subclass > <https://beam.apache.org/documentation/io/developing-io-java/#implementing-the-reader-subclass>? > The documentation is kind of confusing in that part when you are working > with SDF, Should it be treated as Unbounded for the source/reading part? > > > Well, it’s actually opposite - there are two types for Read implementation > in Beam: > - based on Source interface, that you mentioned before (deprecated one); > - based on Splittable DoFn [2], which is a way that one should use > (especially for unbounded sources) for new IO connectors. > > > > [1] https://github.com/apache/beam/pull/11749 > [2] https://beam.apache.org/documentation/io/developing-io-overview/ > > > — > Alexey > >
