Thanks for bringing this up Sameer. On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang <[email protected]> wrote:
> +Chamikara Jayalath <[email protected]> > > Hi Sameer, > > Thanks for reaching out! > > We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform > when we have CustomWindow support in python SDK, which should be coming > soon. > Looking at https://github.com/apache/beam/pull/12572 seems like you just need support for FixedWindows, right ? This should work now I believe since Dataflow Python multi-language pipelines use portable job submission by default. > > In terms of *TimestampPolicyFactory*,if you are using the built-in types, > like ProcessingTimePolicy > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>, > LogAppendTimePolicy > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70> > and withCreateTime > <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>, > it's not hard to expose them to ReadFromKafka. > Yeah, exposing existing policies should not be too hard but defining new policies (or any other option that requires second order functions) requires cross-language UDF support which is not available yet. > > Another interesting topic you have mentioned in > https://github.com/apache/beam/pull/12572 is that you also want to > retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the > KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It > might be a future work for x-lang Kafka. > This is because current ReadFromKafka transform exposes Read.withoutMetaData() transform [1]. I think current Beam Schema support in Python should be adequate to expand this and support a PCollection<Row> that represents a PCollectioin<KafkaRecord> [2] in Python. +Brian Hulette <[email protected]> to confirm. [1] https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595 [2] https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java Thanks, Cham > > On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria < > [email protected]> wrote: > >> Hello, >> >> I am using Beam's Cross language support for reading from Kafka but it is >> missing some features available in the java sdk that I would like to use. >> Specifically, I am interested in the Kafka Commit Transform >> <https://github.com/apache/beam/pull/12572> feature in java sdk. This >> will also require being able to specify if the metadata is needed or not as >> part of the KafkaIO transform creation. In addition, the >> `commitOffsetsInFinalize` >> and `TimestampPolicyFactory`parameters are also missing from the python >> wrapper. >> >> My use case is as follows: >> I have Kafka topics that have data corresponding to the Mongo change >> streams produced by the Mongo Source Kafka connector. In my pipeline, I >> read these updated mongo documents, apply some transformations and stream >> them to BigQuery. >> >> Currently, I can only use the `auto.commit` from Kafka consumer config >> and I believe the message is acked/offset committed after the consumer in >> KafkaIO finishes reading them. If there are any errors in later stages of >> my pipeline or if the pipeline is restarted and it can't be drained >> gracefully, I will lose the already acked messages. Hence, I want to commit >> the offsets only after they are successfully written to BigQuery. >> >> Here is a snippet of my pipeline code. >> >>> def run(self, pipeline: Pipeline): >>> consumer_config = { >>> 'bootstrap.servers': self.bootstrap_servers, >>> 'auto.offset.reset': 'latest', >>> # Ideally we want auto.commit disabled, but we need a way to >>> acknowledge the messages manually >>> # 'enable.auto.commit': 'false', # messages must be acked explicitly >>> but prevents loss in case of failures >>> 'auto.commit.interval.ms': '60000', # keep a high value since manual >>> commits are not supported and messages will be lost if there is an error in >>> the pipeline >>> 'group.id': 'dev_streaming_dr_beam_pipeline' >>> } >>> >>> streamed_dr_kvs_raw = ( >>> pipeline >>> | 'Read from Kafka Stream' >> >>> ReadFromKafka( >>> consumer_config=consumer_config, >>> topics=['mongo_kafka_connect.requests'], >>> max_num_records=1, >>> ) >>> ) >>> >>> dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> >>> ParDo(MongoKafkaMessageDeserializer()) >>> >>> filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = >>> lambda elem: elem.mongo_document is not None >>> filtered_dr_ds_with_record_ts = ( >>> dr_data_stream >>> | 'Filter empty values' >> Filter(filter_record_fn) >>> | 'Extract Timestamp' >> >>> ParDo(MongoKafkaMessageTimestampExtractor()) >>> ) >>> >>> # The lateness window defines how long the state is kept for older >>> windows >>> # and saving state for a longer duration can create memory pressure. >>> Note, the state is >>> # saved in persistent disk but is optimistically fetched in the local >>> memory. >>> batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> >>> WindowInto( >>> FixedWindows(30), # 30 seconds >>> trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)), # any time >>> late data arrives after 1 min >>> allowed_lateness=24 * 60 * 60, # 24 hours for late data >>> accumulation_mode=AccumulationMode.DISCARDING >>> ) >>> >>> extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = lambda >>> elems: elems[0].mongo_document >>> de_duped_dr_records = ( >>> batched_dr_records >>> | 'Group by Message Key' >> GroupBy('message_key') >>> | 'Select Latest' >> Latest.PerKey() >>> | 'Values' >> Values() >>> | 'Extract mongo document' >> Map(extract_mongo_doc_fn) >>> ) >>> >>> dr_with_features = de_duped_dr_records | 'Extract Features' >> >>> ParDo(DealRequestEnricher()) >>> dr_bq_rows = dr_with_features | 'Transform to BQ Row' >> >>> ParDo(DealRequestFeaturesToBQTableRow()) >>> >>> table_schema = self.deal_request_schema_util.get_big_query_table_schema() >>> dr_bq_rows | 'Stream to BigQuery' >> BQSchemaUpdatingWriter( >>> dest_table_spec_vp=StaticValueProvider(str, self.dst_bq_table_spec), >>> dest_table_schema_vp=StaticValueProvider(str, table_schema), >>> time_partitioning_vp=StaticValueProvider(dict, {'type': 'DAY', >>> 'field': 'created_at'}), >>> dead_letter_table_spec_vp=StaticValueProvider(str, >>> self.dead_letter_table_spec) >>> ) >>> >>> >> Thanks! >> >> -- >> Regards, >> Sameer Bhadouria. >> >
