On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang <[email protected]> wrote:
> Re Cham, > >> Looking at https://github.com/apache/beam/pull/12572 seems like you just >> need support for FixedWindows, right ? This should work now I believe since >> Dataflow Python multi-language pipelines use portable job submission by >> default. > > The problem part is not the FixedWindows but the Reshuffle. In Java SDK, > Reshuffle will be expanded into > Window.<KV<K, V>>into(new > IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder())) > which is rejected by the python sdk. > Ah ok. It should not be rejected by Python SDK anymore but Reshuffle ('IdentityWindowFn') indeed will fail when executing the pipeline using Dataflow Runner v2 currently. > I can have a simple PR to expose built-in timestamp policies. Brian, would > you like to identify how much work would need to output KafkaRecord to > python SDK? > > On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath <[email protected]> > wrote: > >> Thanks for bringing this up Sameer. >> >> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <[email protected]> wrote: >> >>> +Chamikara Jayalath <[email protected]> >>> >>> Hi Sameer, >>> >>> Thanks for reaching out! >>> >>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform >>> when we have CustomWindow support in python SDK, which should be coming >>> soon. >>> >> >> Looking at https://github.com/apache/beam/pull/12572 seems like you just >> need support for FixedWindows, right ? This should work now I believe since >> Dataflow Python multi-language pipelines use portable job submission by >> default. >> >> >>> >>> In terms of *TimestampPolicyFactory*,if you are using the built-in >>> types, like ProcessingTimePolicy >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>, >>> LogAppendTimePolicy >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70> >>> and withCreateTime >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>, >>> it's not hard to expose them to ReadFromKafka. >>> >> >> Yeah, exposing existing policies should not be too hard but defining new >> policies (or any other option that requires second order functions) >> requires cross-language UDF support which is not available yet. >> >> >>> >>> Another interesting topic you have mentioned in >>> https://github.com/apache/beam/pull/12572 is that you also want to >>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the >>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It >>> might be a future work for x-lang Kafka. >>> >> >> This is because current ReadFromKafka transform exposes >> Read.withoutMetaData() transform [1]. >> I think current Beam Schema support in Python should be adequate to >> expand this and support a PCollection<Row> that represents a >> PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette >> <[email protected]> to confirm. >> >> [1] >> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595 >> [2] >> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java >> >> Thanks, >> Cham >> >> >>> >>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria < >>> [email protected]> wrote: >>> >>>> Hello, >>>> >>>> I am using Beam's Cross language support for reading from Kafka but it >>>> is missing some features available in the java sdk that I would like to >>>> use. Specifically, I am interested in the Kafka Commit Transform >>>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This >>>> will also require being able to specify if the metadata is needed or not as >>>> part of the KafkaIO transform creation. In addition, the >>>> `commitOffsetsInFinalize` >>>> and `TimestampPolicyFactory`parameters are also missing from the >>>> python wrapper. >>>> >>>> My use case is as follows: >>>> I have Kafka topics that have data corresponding to the Mongo change >>>> streams produced by the Mongo Source Kafka connector. In my pipeline, I >>>> read these updated mongo documents, apply some transformations and stream >>>> them to BigQuery. >>>> >>>> Currently, I can only use the `auto.commit` from Kafka consumer config >>>> and I believe the message is acked/offset committed after the consumer in >>>> KafkaIO finishes reading them. If there are any errors in later stages of >>>> my pipeline or if the pipeline is restarted and it can't be drained >>>> gracefully, I will lose the already acked messages. Hence, I want to commit >>>> the offsets only after they are successfully written to BigQuery. >>>> >>>> Here is a snippet of my pipeline code. >>>> >>>>> def run(self, pipeline: Pipeline): >>>>> consumer_config = { >>>>> 'bootstrap.servers': self.bootstrap_servers, >>>>> 'auto.offset.reset': 'latest', >>>>> # Ideally we want auto.commit disabled, but we need a way to >>>>> acknowledge the messages manually >>>>> # 'enable.auto.commit': 'false', # messages must be acked >>>>> explicitly but prevents loss in case of failures >>>>> 'auto.commit.interval.ms': '60000', # keep a high value since >>>>> manual commits are not supported and messages will be lost if there is an >>>>> error in the pipeline >>>>> 'group.id': 'dev_streaming_dr_beam_pipeline' >>>>> } >>>>> >>>>> streamed_dr_kvs_raw = ( >>>>> pipeline >>>>> | 'Read from Kafka Stream' >> >>>>> ReadFromKafka( >>>>> consumer_config=consumer_config, >>>>> topics=['mongo_kafka_connect.requests'], >>>>> max_num_records=1, >>>>> ) >>>>> ) >>>>> >>>>> dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> >>>>> ParDo(MongoKafkaMessageDeserializer()) >>>>> >>>>> filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = >>>>> lambda elem: elem.mongo_document is not None >>>>> filtered_dr_ds_with_record_ts = ( >>>>> dr_data_stream >>>>> | 'Filter empty values' >> Filter(filter_record_fn) >>>>> | 'Extract Timestamp' >> >>>>> ParDo(MongoKafkaMessageTimestampExtractor()) >>>>> ) >>>>> >>>>> # The lateness window defines how long the state is kept for older >>>>> windows >>>>> # and saving state for a longer duration can create memory pressure. >>>>> Note, the state is >>>>> # saved in persistent disk but is optimistically fetched in the local >>>>> memory. >>>>> batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> >>>>> WindowInto( >>>>> FixedWindows(30), # 30 seconds >>>>> trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)), # any >>>>> time late data arrives after 1 min >>>>> allowed_lateness=24 * 60 * 60, # 24 hours for late data >>>>> accumulation_mode=AccumulationMode.DISCARDING >>>>> ) >>>>> >>>>> extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = >>>>> lambda elems: elems[0].mongo_document >>>>> de_duped_dr_records = ( >>>>> batched_dr_records >>>>> | 'Group by Message Key' >> GroupBy('message_key') >>>>> | 'Select Latest' >> Latest.PerKey() >>>>> | 'Values' >> Values() >>>>> | 'Extract mongo document' >> Map(extract_mongo_doc_fn) >>>>> ) >>>>> >>>>> dr_with_features = de_duped_dr_records | 'Extract Features' >> >>>>> ParDo(DealRequestEnricher()) >>>>> dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> >>>>> ParDo(DealRequestFeaturesToBQTableRow()) >>>>> >>>>> table_schema = >>>>> self.deal_request_schema_util.get_big_query_table_schema() >>>>> dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter( >>>>> dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec), >>>>> dest_table_schema_vp=StaticValueProvider(str, table_schema), >>>>> time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', >>>>> 'field': 'created_at'}), >>>>> dead_letter_table_spec_vp=StaticValueProvider(str, >>>>> self.dead_letter_table_spec) >>>>> ) >>>>> >>>>> >>>> Thanks! >>>> >>>> -- >>>> Regards, >>>> Sameer Bhadouria. >>>> >>>
