As Raghu said,

Just apply a regular ParDo and return a PCollectionTuple afert that you can
extract your Success Records (TupleTag) and your DeadLetter
records(TupleTag) and do whatever you want with them.


Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, 05:18:

> User can read serialized bytes from KafkaIO and deserialize explicitly in
> a ParDo, which gives complete control on how to handle record errors. This
> is I would do if I need to in my pipeline.
>
> If there is a transform in Beam that does this, it could be convenient for
> users in many such scenarios. This is simpler than each source supporting
> it explicitly.
>
> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>> something that can easily be supported. We might be able to support similar
>> features when we have Kafka on top of Splittable DoFn though.
>>
> So feel free to create a feature request JIRA for this.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com> wrote:
>>
>>> This is a great question. I've added the dev list to be sure it gets
>>> noticed by whoever may know best.
>>>
>>> Kenn
>>>
>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
>>> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>> Is there a way to get a Deadletter Output from a pipeline that uses a
>>>> KafkaIO
>>>> connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
>>>> takes
>>>> only a SerializableFunction and not a ParDo, how would I be able to
>>>> produce a
>>>> Deadletter output from it?
>>>>
>>>> I have the following pipeline defined that reads from a KafkaIO input:
>>>>
>>>> pipeline.apply(
>>>>   KafkaIO.<String, String>read()
>>>>     .withBootstrapServers(bootstrap)
>>>>     .withTopics(topics)
>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>     .updateConsumerProperties(
>>>>         ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>> inputMessagesConfig))
>>>>     .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>> "earliest"))
>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>> "beam-consumers"))
>>>>     .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>> "true"))
>>>>     .withTimestampPolicyFactory(
>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>>>     .withReadCommitted()
>>>>     .commitOffsetsInFinalize())
>>>>
>>>>
>>>> and I like to get deadletter outputs when my timestamp extraction fails.
>>>>
>>>> Best,
>>>> Tobi
>>>>
>>>>

Reply via email to