As Raghu said, Just apply a regular ParDo and return a PCollectionTuple afert that you can extract your Success Records (TupleTag) and your DeadLetter records(TupleTag) and do whatever you want with them.
Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, 05:18: > User can read serialized bytes from KafkaIO and deserialize explicitly in > a ParDo, which gives complete control on how to handle record errors. This > is I would do if I need to in my pipeline. > > If there is a transform in Beam that does this, it could be convenient for > users in many such scenarios. This is simpler than each source supporting > it explicitly. > > On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> Given that KafkaIO uses UnboundeSource framework, this is probably not >> something that can easily be supported. We might be able to support similar >> features when we have Kafka on top of Splittable DoFn though. >> > So feel free to create a feature request JIRA for this. >> >> Thanks, >> Cham >> >> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com> wrote: >> >>> This is a great question. I've added the dev list to be sure it gets >>> noticed by whoever may know best. >>> >>> Kenn >>> >>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> >>> wrote: >>> >>>> >>>> Hi, >>>> >>>> Is there a way to get a Deadletter Output from a pipeline that uses a >>>> KafkaIO >>>> connector for it's input? As `TimestampPolicyFactory.withTimestampFn()` >>>> takes >>>> only a SerializableFunction and not a ParDo, how would I be able to >>>> produce a >>>> Deadletter output from it? >>>> >>>> I have the following pipeline defined that reads from a KafkaIO input: >>>> >>>> pipeline.apply( >>>> KafkaIO.<String, String>read() >>>> .withBootstrapServers(bootstrap) >>>> .withTopics(topics) >>>> .withKeyDeserializer(StringDeserializer.class) >>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>> .updateConsumerProperties( >>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>> inputMessagesConfig)) >>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>> "earliest")) >>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>> "beam-consumers")) >>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>> "true")) >>>> .withTimestampPolicyFactory( >>>> TimestampPolicyFactory.withTimestampFn( >>>> new MessageTimestampExtractor(inputMessagesConfig))) >>>> .withReadCommitted() >>>> .commitOffsetsInFinalize()) >>>> >>>> >>>> and I like to get deadletter outputs when my timestamp extraction fails. >>>> >>>> Best, >>>> Tobi >>>> >>>>