Hello 👋! We did some experiments with pushing filter query down to the AvroSource and it significantly improved the performance of the job. The test job was simple: it reads about 8B of Avro specific records and filters them down to 22M and at the end of the day just counts them.
The version of job with the built-in AvroIO was 4x times slower than the job with the tweaked AvroIO/Source which eagerly drops not needed records by applying a user provided T => Boolean function. For the job with the built-in AvroIO we are seeing in logs information about fusing the Avro Read step with the following filtering ParDo. The main question is why fusion of AvroIO with the filtering ParDo doesn't have the same effect as with the explicit query pushdown to the AvroSource? Does this mean even though these steps are fused, Beam still serializes/deserializes them to transfer from Read step to the filtering ParDo? The only feasible explanation we have is that the difference in the performance could be attributed to the less overhead of coding/decoding SpecificRecords, but this seems unnecessary for fused steps. Thank you! Kirill
