We are using DataflowRunner. The job is actually implemented in scio
<https://github.com/spotify/scio>:

// ASchema extends SpecificRecord

object ReadFullWithFilterJob {

  def main(cmdArgs: Array[String]): Unit = {

    val (sc, args) = ContextAndArgs(cmdArgs)
    val inputPath = args("input")

    sc.avroFile[ASchema](inputPath)
      .filter(x => x.field == "value")
      .count

    sc.run()
  }

}


When translated to Beam it would look like this:


import org.apache.avro.specific.SpecificRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;

public class Test {

  public static void main(String[] args) {
    Pipeline p = null;

    // Read Avro-generated classes from files on GCS
    PCollection<Long> count =
        
p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"))
        .apply(Filter.by(x -> x.field == "value"))
        .apply(Count.globally());

  }
}

So basically just AvroIO -> Filter -> Count.

The job which runs faster uses the tweaked AvroSource.readNextRecord
<https://github.com/apache/beam/blob/v2.33.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java#L642-L652>
:

@Override
public boolean readNextRecord() throws IOException {
  if (currentRecordIndex >= numRecords) {
    return false;
  }
  Object record = reader.read(null, decoder);
  boolean recordRead = false;

  while(currentRecordIndex < numRecords) {
    currentRecord =
        (mode.parseFn == null) ? ((T) record) :
mode.parseFn.apply((GenericRecord) record);

    currentRecordIndex++;

    if (mode.filterFn == null || ((SerializableFunction<T,
Boolean>)mode.filterFn).apply(currentRecord)) {
      recordRead = true;
      break;
    }
  }
  return recordRead;
}


filterFn is provided by a user from outside and eagerly filters out
records e.g. (in scio):


object ReadFullWithFilterPushdownJob {

  def main(cmdArgs: Array[String]): Unit = {

    val (sc, args) = ContextAndArgs(cmdArgs)
    val inputPath = args("input")

    sc.customInput("predicatedAvro",
      new AvroRead[EndContentFactXTDaily](inputPath, classOf[AvroAutoGenClass],
        x => x.field == "value")
    )

    sc.run()
  }

}


On Thu, Oct 28, 2021 at 8:07 PM Reuven Lax <[email protected]> wrote:

> It would also help if you shared your code.
>
> On Thu, Oct 28, 2021 at 5:05 PM Reuven Lax <[email protected]> wrote:
>
>> Which runner are we using? Fusions (at least for non-portable pipelines)
>> is generally implemented by the runner, not by Beam.
>>
>> On Thu, Oct 28, 2021 at 3:05 PM Kirill Panarin <[email protected]>
>> wrote:
>>
>>> Hello 👋!
>>>
>>> We did some experiments with pushing filter query down to the AvroSource
>>> and it significantly improved the performance of the job. The test job was
>>> simple: it reads about 8B of Avro specific records and filters them down to
>>> 22M and at the end of the day just counts them.
>>>
>>> The version of job with the built-in  AvroIO was 4x times slower than
>>> the job with the tweaked AvroIO/Source which eagerly drops not needed
>>> records by applying a user provided T => Boolean function. For the job with
>>> the built-in AvroIO we are seeing in logs information about fusing the Avro
>>> Read step with the following filtering ParDo.
>>>
>>> The main question is why fusion of AvroIO with the filtering ParDo
>>> doesn't have the same effect as with the explicit query pushdown to the
>>> AvroSource? Does this mean even though these steps are fused, Beam still
>>> serializes/deserializes them to transfer from Read step to the filtering
>>> ParDo? The only feasible explanation we have is that the difference in the
>>> performance could be attributed to the less overhead of coding/decoding
>>> SpecificRecords, but this seems unnecessary for fused steps.
>>>
>>> Thank you!
>>> Kirill
>>>
>>>
>>>
>>>

Reply via email to