We are using DataflowRunner. The job is actually implemented in scio
<https://github.com/spotify/scio>:
// ASchema extends SpecificRecord
object ReadFullWithFilterJob {
def main(cmdArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdArgs)
val inputPath = args("input")
sc.avroFile[ASchema](inputPath)
.filter(x => x.field == "value")
.count
sc.run()
}
}
When translated to Beam it would look like this:
import org.apache.avro.specific.SpecificRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.values.PCollection;
public class Test {
public static void main(String[] args) {
Pipeline p = null;
// Read Avro-generated classes from files on GCS
PCollection<Long> count =
p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"))
.apply(Filter.by(x -> x.field == "value"))
.apply(Count.globally());
}
}
So basically just AvroIO -> Filter -> Count.
The job which runs faster uses the tweaked AvroSource.readNextRecord
<https://github.com/apache/beam/blob/v2.33.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java#L642-L652>
:
@Override
public boolean readNextRecord() throws IOException {
if (currentRecordIndex >= numRecords) {
return false;
}
Object record = reader.read(null, decoder);
boolean recordRead = false;
while(currentRecordIndex < numRecords) {
currentRecord =
(mode.parseFn == null) ? ((T) record) :
mode.parseFn.apply((GenericRecord) record);
currentRecordIndex++;
if (mode.filterFn == null || ((SerializableFunction<T,
Boolean>)mode.filterFn).apply(currentRecord)) {
recordRead = true;
break;
}
}
return recordRead;
}
filterFn is provided by a user from outside and eagerly filters out
records e.g. (in scio):
object ReadFullWithFilterPushdownJob {
def main(cmdArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdArgs)
val inputPath = args("input")
sc.customInput("predicatedAvro",
new AvroRead[EndContentFactXTDaily](inputPath, classOf[AvroAutoGenClass],
x => x.field == "value")
)
sc.run()
}
}
On Thu, Oct 28, 2021 at 8:07 PM Reuven Lax <[email protected]> wrote:
> It would also help if you shared your code.
>
> On Thu, Oct 28, 2021 at 5:05 PM Reuven Lax <[email protected]> wrote:
>
>> Which runner are we using? Fusions (at least for non-portable pipelines)
>> is generally implemented by the runner, not by Beam.
>>
>> On Thu, Oct 28, 2021 at 3:05 PM Kirill Panarin <[email protected]>
>> wrote:
>>
>>> Hello 👋!
>>>
>>> We did some experiments with pushing filter query down to the AvroSource
>>> and it significantly improved the performance of the job. The test job was
>>> simple: it reads about 8B of Avro specific records and filters them down to
>>> 22M and at the end of the day just counts them.
>>>
>>> The version of job with the built-in AvroIO was 4x times slower than
>>> the job with the tweaked AvroIO/Source which eagerly drops not needed
>>> records by applying a user provided T => Boolean function. For the job with
>>> the built-in AvroIO we are seeing in logs information about fusing the Avro
>>> Read step with the following filtering ParDo.
>>>
>>> The main question is why fusion of AvroIO with the filtering ParDo
>>> doesn't have the same effect as with the explicit query pushdown to the
>>> AvroSource? Does this mean even though these steps are fused, Beam still
>>> serializes/deserializes them to transfer from Read step to the filtering
>>> ParDo? The only feasible explanation we have is that the difference in the
>>> performance could be attributed to the less overhead of coding/decoding
>>> SpecificRecords, but this seems unnecessary for fused steps.
>>>
>>> Thank you!
>>> Kirill
>>>
>>>
>>>
>>>