This is a great question. I've added the dev list to be sure it gets
noticed by whoever may know best.

Kenn

On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
wrote:

>
> Hi,
>
> Is there a way to get a Deadletter Output from a pipeline that uses a
> KafkaIO
> connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
> takes
> only a SerializableFunction and not a ParDo, how would I be able to
> produce a
> Deadletter output from it?
>
> I have the following pipeline defined that reads from a KafkaIO input:
>
> pipeline.apply(
>   KafkaIO.<String, String>read()
>     .withBootstrapServers(bootstrap)
>     .withTopics(topics)
>     .withKeyDeserializer(StringDeserializer.class)
>     .withValueDeserializer(ConfigurableDeserializer.class)
>     .updateConsumerProperties(
>         ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
> inputMessagesConfig))
>     .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
> "earliest"))
>     .updateConsumerProperties(ImmutableMap.of("group.id",
> "beam-consumers"))
>     .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
> "true"))
>     .withTimestampPolicyFactory(
>         TimestampPolicyFactory.withTimestampFn(
>             new MessageTimestampExtractor(inputMessagesConfig)))
>     .withReadCommitted()
>     .commitOffsetsInFinalize())
>
>
> and I like to get deadletter outputs when my timestamp extraction fails.
>
> Best,
> Tobi
>
>

Reply via email to