Thanks for bringing this up Sameer.

On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <[email protected]> wrote:

> +Chamikara Jayalath <[email protected]>
>
> Hi Sameer,
>
> Thanks for reaching out!
>
> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
> when we have CustomWindow support in python SDK, which should be coming
> soon.
>

Looking at https://github.com/apache/beam/pull/12572 seems like you just
need support for FixedWindows, right ? This should work now I believe since
Dataflow Python multi-language pipelines use portable job submission by
default.


>
> In terms of *TimestampPolicyFactory*,if you are using the built-in types,
> like ProcessingTimePolicy
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>,
> LogAppendTimePolicy
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70>
> and withCreateTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>,
> it's not hard to expose them to ReadFromKafka.
>

Yeah, exposing existing policies should not be too hard but defining new
policies (or any other option that requires second order functions)
requires cross-language UDF support which is not available yet.


>
> Another interesting topic you have mentioned in
> https://github.com/apache/beam/pull/12572 is that you also want to
> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
> might be a future work for x-lang Kafka.
>

This is because current ReadFromKafka transform exposes
Read.withoutMetaData() transform [1].
I think current Beam Schema support in Python should be adequate to expand
this and support a PCollection<Row> that represents a
PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette
<[email protected]> to confirm.

[1]
https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
[2]
https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java

Thanks,
Cham


>
> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
> [email protected]> wrote:
>
>> Hello,
>>
>> I am using Beam's Cross language support for reading from Kafka but it is
>> missing some features available in the java sdk that I would like to use.
>> Specifically, I am interested in the Kafka Commit Transform
>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This
>> will also require being able to specify if the metadata is needed or not as
>> part of the KafkaIO transform creation. In addition, the 
>> `commitOffsetsInFinalize`
>> and `TimestampPolicyFactory`parameters are also missing from the python
>> wrapper.
>>
>> My use case is as follows:
>> I have Kafka topics that have data corresponding to the Mongo change
>> streams produced by the Mongo Source Kafka connector. In my pipeline, I
>> read these updated mongo documents, apply some transformations and stream
>> them to BigQuery.
>>
>> Currently, I can only use the `auto.commit` from Kafka consumer config
>> and I believe the message is acked/offset committed after the consumer in
>> KafkaIO finishes reading them. If there are any errors in later stages of
>> my pipeline or if the pipeline is restarted and it can't be drained
>> gracefully, I will lose the already acked messages. Hence, I want to commit
>> the offsets only after they are successfully written to BigQuery.
>>
>> Here is a snippet of my pipeline code.
>>
>>> def run(self, pipeline: Pipeline):
>>>    consumer_config = {
>>>       'bootstrap.servers': self.bootstrap_servers,
>>>       'auto.offset.reset': 'latest',
>>>       # Ideally we want auto.commit disabled, but we need a way to 
>>> acknowledge the messages manually
>>>       # 'enable.auto.commit': 'false',  # messages must be acked explicitly 
>>> but prevents loss in case of failures
>>>       'auto.commit.interval.ms': '60000',  # keep a high value since manual 
>>> commits are not supported and messages will be lost if there is an error in 
>>> the pipeline
>>>       'group.id': 'dev_streaming_dr_beam_pipeline'
>>>    }
>>>
>>>    streamed_dr_kvs_raw = (
>>>          pipeline
>>>          | 'Read from Kafka Stream' >>
>>>          ReadFromKafka(
>>>             consumer_config=consumer_config,
>>>             topics=['mongo_kafka_connect.requests'],
>>>             max_num_records=1,
>>>          )
>>>    )
>>>
>>>    dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
>>> ParDo(MongoKafkaMessageDeserializer())
>>>
>>>    filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
>>> lambda elem: elem.mongo_document is not None
>>>    filtered_dr_ds_with_record_ts = (
>>>          dr_data_stream
>>>          | 'Filter empty values' >> Filter(filter_record_fn)
>>>          | 'Extract Timestamp' >> 
>>> ParDo(MongoKafkaMessageTimestampExtractor())
>>>    )
>>>
>>>    # The lateness window defines how long the state is kept for older 
>>> windows
>>>    # and saving state for a longer duration can create memory pressure. 
>>> Note, the state is
>>>    # saved in persistent disk but is optimistically fetched in the local 
>>> memory.
>>>    batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> 
>>> WindowInto(
>>>       FixedWindows(30),  # 30 seconds
>>>       trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any time 
>>> late data arrives after 1 min
>>>       allowed_lateness=24 * 60 * 60, # 24 hours for late data
>>>       accumulation_mode=AccumulationMode.DISCARDING
>>>    )
>>>
>>>    extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = lambda 
>>> elems: elems[0].mongo_document
>>>    de_duped_dr_records = (
>>>          batched_dr_records
>>>          | 'Group by Message Key' >> GroupBy('message_key')
>>>          | 'Select Latest' >> Latest.PerKey()
>>>          | 'Values' >> Values()
>>>          | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
>>>    )
>>>
>>>    dr_with_features = de_duped_dr_records | 'Extract Features' >> 
>>> ParDo(DealRequestEnricher())
>>>    dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> 
>>> ParDo(DealRequestFeaturesToBQTableRow())
>>>
>>>    table_schema = self.deal_request_schema_util.get_big_query_table_schema()
>>>    dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter(
>>>       dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec),
>>>       dest_table_schema_vp=StaticValueProvider(str, table_schema),
>>>       time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', 
>>> 'field': 'created_at'}),
>>>       dead_letter_table_spec_vp=StaticValueProvider(str, 
>>> self.dead_letter_table_spec)
>>>    )
>>>
>>>
>> Thanks!
>>
>> --
>> Regards,
>> Sameer Bhadouria.
>>
>

Reply via email to