If I understand correctly, with Avro predicates push-down, it will filter out 
the records on the level of Avro file reading and these records won’t even be 
processed by Beam. So, filtering is happening on a “low” level of a file reader 
and it’s very effective, especially, if you need only 22M records from 8B.

On the other hand, with fusion but without predicates pushdown, it will read 
all data with AvroIO, process it upstream of your pipeline and only then filter 
out the records. So, fusion should help with ser/deser and copying the data 
over the network from one workers to another, but it won’t help with 
eliminating of reading all this 8B of records. I’m not familiar with fusion 
implementation details on Dataflow, so maybe I’m mistaken with this, but this 
is how I understand it works.

We observed the similar results as you with ParquetIO/GenericRecords and schema 
projection/predicate push-down (ParquetIO already supports it [1]). Imho, it 
means that filtering on “low” level of your storage format is much more 
effective than on more “higher" data processing level.

There is also ongoing work to add projection pushdown across Schema-aware 
PTransforms in the Beam Java SDK [2]. So, once it will be implemented, then 
pushdown should be more transparent for users, at least, with using Beam SQL 
transforms (though, I hope with other Schema-aware PTransforms as well) and 
work automatically with some IOs that support this. 

[1] https://issues.apache.org/jira/browse/BEAM-7925 
<https://issues.apache.org/jira/browse/BEAM-7925>
[2] 
https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/
 
<https://docs.google.com/document/d/1eHSO3aIsAUmiVtfDL-pEFenNBRKt26KkF4dQloMhpBQ/>


—
Alexey


> On 29 Oct 2021, at 02:52, Kirill Panarin <[email protected]> wrote:
> 
> We are using DataflowRunner. The job is actually implemented in scio 
> <https://github.com/spotify/scio>: 
> 
> // ASchema extends SpecificRecord
> 
> object ReadFullWithFilterJob {
> 
>   def main(cmdArgs: Array[String]): Unit = {
> 
>     val (sc, args) = ContextAndArgs(cmdArgs)
>     val inputPath = args("input")
> 
>     sc.avroFile[ASchema](inputPath)
>       .filter(x => x.field == "value")
>       .count
> 
>     sc.run()
>   }
> 
> }
> 
> When translated to Beam it would look like this:
> 
> import org.apache.avro.specific.SpecificRecord;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.AvroIO;
> import org.apache.beam.sdk.transforms.Count;
> import org.apache.beam.sdk.transforms.Filter;
> import org.apache.beam.sdk.values.PCollection;
> 
> public class Test {
> 
>   public static void main(String[] args) {
>     Pipeline p = null;
> 
>     // Read Avro-generated classes from files on GCS
>     PCollection<Long> count =
>         
> p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"))
>         .apply(Filter.by(x -> x.field == "value"))
>         .apply(Count.globally());
> 
>   }
> }
> So basically just AvroIO -> Filter -> Count.
> 
> The job which runs faster uses the tweaked AvroSource.readNextRecord 
> <https://github.com/apache/beam/blob/v2.33.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java#L642-L652>:
> 
> @Override
> public boolean readNextRecord() throws IOException {
>   if (currentRecordIndex >= numRecords) {
>     return false;
>   }
>   Object record = reader.read(null, decoder);
>   boolean recordRead = false;
> 
>   while(currentRecordIndex < numRecords) {
>     currentRecord =
>         (mode.parseFn == null) ? ((T) record) : 
> mode.parseFn.apply((GenericRecord) record);
> 
>     currentRecordIndex++;
> 
>     if (mode.filterFn == null || ((SerializableFunction<T, 
> Boolean>)mode.filterFn).apply(currentRecord)) {
>       recordRead = true;
>       break;
>     }
>   }
>   return recordRead;
> }
> 
> filterFn is provided by a user from outside and eagerly filters out records 
> e.g. (in scio):
> 
> object ReadFullWithFilterPushdownJob {
> 
>   def main(cmdArgs: Array[String]): Unit = {
> 
>     val (sc, args) = ContextAndArgs(cmdArgs)
>     val inputPath = args("input")
> 
>     sc.customInput("predicatedAvro",
>       new AvroRead[EndContentFactXTDaily](inputPath, 
> classOf[AvroAutoGenClass],
>         x => x.field == "value")
>     )
> 
>     sc.run()
>   }
> 
> }
> 
> On Thu, Oct 28, 2021 at 8:07 PM Reuven Lax <[email protected] 
> <mailto:[email protected]>> wrote:
> It would also help if you shared your code.
> 
> On Thu, Oct 28, 2021 at 5:05 PM Reuven Lax <[email protected] 
> <mailto:[email protected]>> wrote:
> Which runner are we using? Fusions (at least for non-portable pipelines) is 
> generally implemented by the runner, not by Beam.
> 
> On Thu, Oct 28, 2021 at 3:05 PM Kirill Panarin <[email protected] 
> <mailto:[email protected]>> wrote:
> Hello 👋!
> 
> We did some experiments with pushing filter query down to the AvroSource and 
> it significantly improved the performance of the job. The test job was 
> simple: it reads about 8B of Avro specific records and filters them down to 
> 22M and at the end of the day just counts them. 
> 
> The version of job with the built-in  AvroIO was 4x times slower than the job 
> with the tweaked AvroIO/Source which eagerly drops not needed records by 
> applying a user provided T => Boolean function. For the job with the built-in 
> AvroIO we are seeing in logs information about fusing the Avro Read step with 
> the following filtering ParDo.
> 
> The main question is why fusion of AvroIO with the filtering ParDo doesn't 
> have the same effect as with the explicit query pushdown to the AvroSource? 
> Does this mean even though these steps are fused, Beam still 
> serializes/deserializes them to transfer from Read step to the filtering 
> ParDo? The only feasible explanation we have is that the difference in the 
> performance could be attributed to the less overhead of coding/decoding 
> SpecificRecords, but this seems unnecessary for fused steps. 
> 
> Thank you!
> Kirill
> 
> 
> 

Reply via email to