If I understand correctly, with Avro predicates push-down, it will filter out the records on the level of Avro file reading and these records won’t even be processed by Beam. So, filtering is happening on a “low” level of a file reader and it’s very effective, especially, if you need only 22M records from 8B.
On the other hand, with fusion but without predicates pushdown, it will read all data with AvroIO, process it upstream of your pipeline and only then filter out the records. So, fusion should help with ser/deser and copying the data over the network from one workers to another, but it won’t help with eliminating of reading all this 8B of records. I’m not familiar with fusion implementation details on Dataflow, so maybe I’m mistaken with this, but this is how I understand it works. We observed the similar results as you with ParquetIO/GenericRecords and schema projection/predicate push-down (ParquetIO already supports it [1]). Imho, it means that filtering on “low” level of your storage format is much more effective than on more “higher" data processing level. There is also ongoing work to add projection pushdown across Schema-aware PTransforms in the Beam Java SDK [2]. So, once it will be implemented, then pushdown should be more transparent for users, at least, with using Beam SQL transforms (though, I hope with other Schema-aware PTransforms as well) and work automatically with some IOs that support this. [1] https://issues.apache.org/jira/browse/BEAM-7925 <https://issues.apache.org/jira/browse/BEAM-7925> [2] https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/ <https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/> — Alexey > On 29 Oct 2021, at 02:52, Kirill Panarin <[email protected]> wrote: > > We are using DataflowRunner. The job is actually implemented in scio > <https://github.com/spotify/scio>: > > // ASchema extends SpecificRecord > > object ReadFullWithFilterJob { > > def main(cmdArgs: Array[String]): Unit = { > > val (sc, args) = ContextAndArgs(cmdArgs) > val inputPath = args("input") > > sc.avroFile[ASchema](inputPath) > .filter(x => x.field == "value") > .count > > sc.run() > } > > } > > When translated to Beam it would look like this: > > import org.apache.avro.specific.SpecificRecord; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.AvroIO; > import org.apache.beam.sdk.transforms.Count; > import org.apache.beam.sdk.transforms.Filter; > import org.apache.beam.sdk.values.PCollection; > > public class Test { > > public static void main(String[] args) { > Pipeline p = null; > > // Read Avro-generated classes from files on GCS > PCollection<Long> count = > > p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro")) > .apply(Filter.by(x -> x.field == "value")) > .apply(Count.globally()); > > } > } > So basically just AvroIO -> Filter -> Count. > > The job which runs faster uses the tweaked AvroSource.readNextRecord > <https://github.com/apache/beam/blob/v2.33.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java#L642-L652>: > > @Override > public boolean readNextRecord() throws IOException { > if (currentRecordIndex >= numRecords) { > return false; > } > Object record = reader.read(null, decoder); > boolean recordRead = false; > > while(currentRecordIndex < numRecords) { > currentRecord = > (mode.parseFn == null) ? ((T) record) : > mode.parseFn.apply((GenericRecord) record); > > currentRecordIndex++; > > if (mode.filterFn == null || ((SerializableFunction<T, > Boolean>)mode.filterFn).apply(currentRecord)) { > recordRead = true; > break; > } > } > return recordRead; > } > > filterFn is provided by a user from outside and eagerly filters out records > e.g. (in scio): > > object ReadFullWithFilterPushdownJob { > > def main(cmdArgs: Array[String]): Unit = { > > val (sc, args) = ContextAndArgs(cmdArgs) > val inputPath = args("input") > > sc.customInput("predicatedAvro", > new AvroRead[EndContentFactXTDaily](inputPath, > classOf[AvroAutoGenClass], > x => x.field == "value") > ) > > sc.run() > } > > } > > On Thu, Oct 28, 2021 at 8:07 PM Reuven Lax <[email protected] > <mailto:[email protected]>> wrote: > It would also help if you shared your code. > > On Thu, Oct 28, 2021 at 5:05 PM Reuven Lax <[email protected] > <mailto:[email protected]>> wrote: > Which runner are we using? Fusions (at least for non-portable pipelines) is > generally implemented by the runner, not by Beam. > > On Thu, Oct 28, 2021 at 3:05 PM Kirill Panarin <[email protected] > <mailto:[email protected]>> wrote: > Hello 👋! > > We did some experiments with pushing filter query down to the AvroSource and > it significantly improved the performance of the job. The test job was > simple: it reads about 8B of Avro specific records and filters them down to > 22M and at the end of the day just counts them. > > The version of job with the built-in AvroIO was 4x times slower than the job > with the tweaked AvroIO/Source which eagerly drops not needed records by > applying a user provided T => Boolean function. For the job with the built-in > AvroIO we are seeing in logs information about fusing the Avro Read step with > the following filtering ParDo. > > The main question is why fusion of AvroIO with the filtering ParDo doesn't > have the same effect as with the explicit query pushdown to the AvroSource? > Does this mean even though these steps are fused, Beam still > serializes/deserializes them to transfer from Read step to the filtering > ParDo? The only feasible explanation we have is that the difference in the > performance could be attributed to the less overhead of coding/decoding > SpecificRecords, but this seems unnecessary for fused steps. > > Thank you! > Kirill > > >
