Hi Boyuan,

       Thank you for your response.  If I understood correctly, does each
shard's output get batched into memory in  DoFnOutputManager [1] before
passing to downstream operations?

       I am trying to understand the root cause of OOM in executors if I
increase the max number of records to be read without changing the
executor's memory, numbers or cpu cores assigned.

      Below is a snippet for reading from Kafka:

      PTransform<PBegin, PCollection<KV<String, String>>> kafka =
        KafkaIO.<String, String>read()
                .withBootstrapServers(options.getBootstrap())
                .withTopic(options.getTopic())
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withMaxNumRecords(options.getKafkaRecordsToBeRead())
                .commitOffsetsInFinalize()
                .withConsumerConfigUpdates(map)
                .commitOffsetsInFinalize()
                .withoutMetadata();


[1]
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L244


 Thank You,
Shrikant Bang


On Mon, Jan 11, 2021 at 11:54 PM Boyuan Zhang <[email protected]> wrote:

> +dev <[email protected]>
>
> Hi Shrikant,
> If you look into the expansion of BoundedReadfromUnboundedSource[1], you
> will notice that it will expand into Create single shard -> Split into
> multiple shard -> read from one shard. The number of records from one shard
> will not be larger than 10000 and the number of shards will not be larger
> than 100[2].
>
> Back to your questions,  the OutputReceiver will output the received
> element to the downstream operation immediately. It will not keep the
> record in batch in memory. But you are right that
> BoundedReadFromUnboundedSource is memory-sensitive especially if your
> records or your downstream operations consume a lot.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L93-L121
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L156-L161
>
>
> On Mon, Jan 11, 2021 at 5:48 AM shrikant bang <
> [email protected]> wrote:
>
>> Hi Team,
>>
>> I have ETL use cases with source as Kafka ( in *Batch* mode)
>> with SparkRunner. I am trying to understand the internals' of
>> KafkaIO.Read.
>>
>> Can someone please confirm if my understanding is correct?
>>
>>    - WindowedContextOutputReceiver is getting used for collecting Kafka
>> records from KafkaIO.Read from BoundedReadFromUnboundedSource [1].
>>    - All read Kafka records get stored in memory and gets spilled to
>> downstream once the loop ends in Read function [2].
>>
>> Ref :
>> [1] :
>> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L206
>>
>> [2] :
>> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L201
>>
>>
>>
>> Thank You,
>> Shrikant Bang.
>>
>> On Sun, Jan 10, 2021 at 11:43 PM shrikant bang <
>> [email protected]> wrote:
>>
>>> Hi Team,
>>>
>>>    I have below questions/ understandings for KafkaIO.Read in batch mode
>>> :
>>>
>>>    1. I built an understanding on debugging that, KafkaIO converts
>>>    unbounded stream into bounded read and *buffers all records* till
>>>    either of criteria matches - max records/ max time to read.
>>>    If this understanding is correct, then read is memory intensive as
>>>    KafkaIO has to buffer all read records before passing to down-streams. Is
>>>    my understanding correct?
>>>
>>>    2. If #1 is correct, then is there any way we can keep writing
>>>    records instead of buffering into memory in KafkaIO read operation (in
>>>    batch mode) ?
>>>
>>>
>>> Thank You,
>>> Shrikant Bang
>>>
>>

Reply via email to