To be clear, returning min_timestamp for unparsable records shound not affect the watermark.
On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <rang...@google.com> wrote: > How about returning min_timestamp? The would be dropped or redirected by > the ParDo after that. > Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is > this pipeline defined under kafkaio package? > > On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <lc...@google.com> wrote: > >> In this case, the user is attempting to handle errors when parsing the >> timestamp. The timestamp controls the watermark for the UnboundedSource, >> how would they control the watermark in a downstream ParDo? >> >> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> wrote: >> >>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> >>>> Ah nice. Yeah, if user can return full bytes instead of applying a >>>> function that would result in an exception, this can be extracted by a >>>> ParDo down the line. >>>> >>> >>> KafkaIO does return bytes, and I think most sources should, unless there >>> is a good reason not to. >>> Given that, do we think Beam should provide a tranform that makes to >>> simpler to handle deadletter output? I think there was a thread about it in >>> the past. >>> >>> >>>> >>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>> jcgarc...@gmail.com> wrote: >>>> >>>>> As Raghu said, >>>>> >>>>> Just apply a regular ParDo and return a PCollectionTuple afert that >>>>> you can extract your Success Records (TupleTag) and your DeadLetter >>>>> records(TupleTag) and do whatever you want with them. >>>>> >>>>> >>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, >>>>> 05:18: >>>>> >>>>>> User can read serialized bytes from KafkaIO and deserialize >>>>>> explicitly in a ParDo, which gives complete control on how to handle >>>>>> record >>>>>> errors. This is I would do if I need to in my pipeline. >>>>>> >>>>>> If there is a transform in Beam that does this, it could be >>>>>> convenient for users in many such scenarios. This is simpler than each >>>>>> source supporting it explicitly. >>>>>> >>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>> chamik...@google.com> wrote: >>>>>> >>>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably >>>>>>> not something that can easily be supported. We might be able to support >>>>>>> similar features when we have Kafka on top of Splittable DoFn though. >>>>>>> >>>>>> So feel free to create a feature request JIRA for this. >>>>>>> >>>>>>> Thanks, >>>>>>> Cham >>>>>>> >>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> This is a great question. I've added the dev list to be sure it >>>>>>>> gets noticed by whoever may know best. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>> tobias.kay...@ricardo.ch> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that >>>>>>>>> uses a KafkaIO >>>>>>>>> connector for it's input? As >>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>> only a SerializableFunction and not a ParDo, how would I be able >>>>>>>>> to produce a >>>>>>>>> Deadletter output from it? >>>>>>>>> >>>>>>>>> I have the following pipeline defined that reads from a KafkaIO >>>>>>>>> input: >>>>>>>>> >>>>>>>>> pipeline.apply( >>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>> .withTopics(topics) >>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>> .updateConsumerProperties( >>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>> inputMessagesConfig)) >>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>> "earliest")) >>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>> "beam-consumers")) >>>>>>>>> >>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>> "true")) >>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>> new MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>> .withReadCommitted() >>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>> >>>>>>>>> >>>>>>>>> and I like to get deadletter outputs when my timestamp extraction >>>>>>>>> fails. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Tobi >>>>>>>>> >>>>>>>>>