I opened https://github.com/apache/beam/pull/13779 for exxposing built-in
timestamp policy and commitOffsetInFinalize to ReadFromKafka.

On Fri, Jan 15, 2021 at 3:09 PM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang <[email protected]> wrote:
>
>> Re Cham,
>>
>>> Looking at https://github.com/apache/beam/pull/12572 seems like you
>>> just need support for FixedWindows, right ? This should work now I believe
>>> since Dataflow Python multi-language pipelines use portable job submission
>>> by default.
>>
>> The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
>> Reshuffle will be expanded into
>> Window.<KV<K, V>>into(new
>> IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
>> which is rejected by the python sdk.
>>
>
> Ah ok. It should not be rejected by Python SDK anymore but  Reshuffle
> ('IdentityWindowFn') indeed will fail when executing the pipeline using
> Dataflow Runner v2 currently.
>
>
>> I can have a simple PR to expose built-in timestamp policies. Brian,
>> would you like to identify how much work would need to output KafkaRecord
>> to python SDK?
>>
>> On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> Thanks for bringing this up Sameer.
>>>
>>> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <[email protected]> wrote:
>>>
>>>> +Chamikara Jayalath <[email protected]>
>>>>
>>>> Hi Sameer,
>>>>
>>>> Thanks for reaching out!
>>>>
>>>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
>>>> when we have CustomWindow support in python SDK, which should be coming
>>>> soon.
>>>>
>>>
>>> Looking at https://github.com/apache/beam/pull/12572 seems like you
>>> just need support for FixedWindows, right ? This should work now I believe
>>> since Dataflow Python multi-language pipelines use portable job submission
>>> by default.
>>>
>>>
>>>>
>>>> In terms of *TimestampPolicyFactory*,if you are using the built-in
>>>> types, like ProcessingTimePolicy
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>,
>>>> LogAppendTimePolicy
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70>
>>>> and withCreateTime
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>,
>>>> it's not hard to expose them to ReadFromKafka.
>>>>
>>>
>>> Yeah, exposing existing policies should not be too hard but defining new
>>> policies (or any other option that requires second order functions)
>>> requires cross-language UDF support which is not available yet.
>>>
>>>
>>>>
>>>> Another interesting topic you have mentioned in
>>>> https://github.com/apache/beam/pull/12572 is that you also want to
>>>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
>>>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
>>>> might be a future work for x-lang Kafka.
>>>>
>>>
>>> This is because current ReadFromKafka transform exposes
>>> Read.withoutMetaData() transform [1].
>>> I think current Beam Schema support in Python should be adequate to
>>> expand this and support a PCollection<Row> that represents a
>>> PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette
>>> <[email protected]> to confirm.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
>>> [2]
>>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
>>>> [email protected]> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am using Beam's Cross language support for reading from Kafka but it
>>>>> is missing some features available in the java sdk that I would like to
>>>>> use. Specifically, I am interested in the Kafka Commit Transform
>>>>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This
>>>>> will also require being able to specify if the metadata is needed or not 
>>>>> as
>>>>> part of the KafkaIO transform creation. In addition, the 
>>>>> `commitOffsetsInFinalize`
>>>>> and `TimestampPolicyFactory`parameters are also missing from the
>>>>> python wrapper.
>>>>>
>>>>> My use case is as follows:
>>>>> I have Kafka topics that have data corresponding to the Mongo change
>>>>> streams produced by the Mongo Source Kafka connector. In my pipeline, I
>>>>> read these updated mongo documents, apply some transformations and stream
>>>>> them to BigQuery.
>>>>>
>>>>> Currently, I can only use the `auto.commit` from Kafka consumer config
>>>>> and I believe the message is acked/offset committed after the consumer in
>>>>> KafkaIO finishes reading them. If there are any errors in later stages of
>>>>> my pipeline or if the pipeline is restarted and it can't be drained
>>>>> gracefully, I will lose the already acked messages. Hence, I want to 
>>>>> commit
>>>>> the offsets only after they are successfully written to BigQuery.
>>>>>
>>>>> Here is a snippet of my pipeline code.
>>>>>
>>>>>> def run(self, pipeline: Pipeline):
>>>>>>    consumer_config = {
>>>>>>       'bootstrap.servers': self.bootstrap_servers,
>>>>>>       'auto.offset.reset': 'latest',
>>>>>>       # Ideally we want auto.commit disabled, but we need a way to 
>>>>>> acknowledge the messages manually
>>>>>>       # 'enable.auto.commit': 'false',  # messages must be acked 
>>>>>> explicitly but prevents loss in case of failures
>>>>>>       'auto.commit.interval.ms': '60000',  # keep a high value since 
>>>>>> manual commits are not supported and messages will be lost if there is 
>>>>>> an error in the pipeline
>>>>>>       'group.id': 'dev_streaming_dr_beam_pipeline'
>>>>>>    }
>>>>>>
>>>>>>    streamed_dr_kvs_raw = (
>>>>>>          pipeline
>>>>>>          | 'Read from Kafka Stream' >>
>>>>>>          ReadFromKafka(
>>>>>>             consumer_config=consumer_config,
>>>>>>             topics=['mongo_kafka_connect.requests'],
>>>>>>             max_num_records=1,
>>>>>>          )
>>>>>>    )
>>>>>>
>>>>>>    dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' 
>>>>>> >> ParDo(MongoKafkaMessageDeserializer())
>>>>>>
>>>>>>    filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
>>>>>> lambda elem: elem.mongo_document is not None
>>>>>>    filtered_dr_ds_with_record_ts = (
>>>>>>          dr_data_stream
>>>>>>          | 'Filter empty values' >> Filter(filter_record_fn)
>>>>>>          | 'Extract Timestamp' >> 
>>>>>> ParDo(MongoKafkaMessageTimestampExtractor())
>>>>>>    )
>>>>>>
>>>>>>    # The lateness window defines how long the state is kept for older 
>>>>>> windows
>>>>>>    # and saving state for a longer duration can create memory pressure. 
>>>>>> Note, the state is
>>>>>>    # saved in persistent disk but is optimistically fetched in the local 
>>>>>> memory.
>>>>>>    batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' 
>>>>>> >> WindowInto(
>>>>>>       FixedWindows(30),  # 30 seconds
>>>>>>       trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any 
>>>>>> time late data arrives after 1 min
>>>>>>       allowed_lateness=24 * 60 * 60, # 24 hours for late data
>>>>>>       accumulation_mode=AccumulationMode.DISCARDING
>>>>>>    )
>>>>>>
>>>>>>    extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = 
>>>>>> lambda elems: elems[0].mongo_document
>>>>>>    de_duped_dr_records = (
>>>>>>          batched_dr_records
>>>>>>          | 'Group by Message Key' >> GroupBy('message_key')
>>>>>>          | 'Select Latest' >> Latest.PerKey()
>>>>>>          | 'Values' >> Values()
>>>>>>          | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
>>>>>>    )
>>>>>>
>>>>>>    dr_with_features = de_duped_dr_records | 'Extract Features' >> 
>>>>>> ParDo(DealRequestEnricher())
>>>>>>    dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> 
>>>>>> ParDo(DealRequestFeaturesToBQTableRow())
>>>>>>
>>>>>>    table_schema = 
>>>>>> self.deal_request_schema_util.get_big_query_table_schema()
>>>>>>    dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter(
>>>>>>       dest_table_spec_vp=StaticValueProvider(str, 
>>>>>> self.dst_bq_table_spec),
>>>>>>       dest_table_schema_vp=StaticValueProvider(str, table_schema),
>>>>>>       time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', 
>>>>>> 'field': 'created_at'}),
>>>>>>       dead_letter_table_spec_vp=StaticValueProvider(str, 
>>>>>> self.dead_letter_table_spec)
>>>>>>    )
>>>>>>
>>>>>>
>>>>> Thanks!
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Sameer Bhadouria.
>>>>>
>>>>

Reply via email to