On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <lc...@google.com> wrote:
> You would have to return min timestamp for all records otherwise the > watermark may have advanced and you would be outputting records that are > droppably late. > That would be fine I guess. What’s the timestamp for a record that doesn’t have one? > On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <rang...@google.com> wrote: > >> To be clear, returning min_timestamp for unparsable records shound not >> affect the watermark. >> >> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <rang...@google.com> wrote: >> >>> How about returning min_timestamp? The would be dropped or redirected by >>> the ParDo after that. >>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is >>> this pipeline defined under kafkaio package? >>> >>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <lc...@google.com> wrote: >>> >>>> In this case, the user is attempting to handle errors when parsing the >>>> timestamp. The timestamp controls the watermark for the UnboundedSource, >>>> how would they control the watermark in a downstream ParDo? >>>> >>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> >>>> wrote: >>>> >>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a >>>>>> function that would result in an exception, this can be extracted by a >>>>>> ParDo down the line. >>>>>> >>>>> >>>>> KafkaIO does return bytes, and I think most sources should, unless >>>>> there is a good reason not to. >>>>> Given that, do we think Beam should provide a tranform that makes to >>>>> simpler to handle deadletter output? I think there was a thread about it >>>>> in >>>>> the past. >>>>> >>>>> >>>>>> >>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>> jcgarc...@gmail.com> wrote: >>>>>> >>>>>>> As Raghu said, >>>>>>> >>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert that >>>>>>> you can extract your Success Records (TupleTag) and your DeadLetter >>>>>>> records(TupleTag) and do whatever you want with them. >>>>>>> >>>>>>> >>>>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, >>>>>>> 05:18: >>>>>>> >>>>>>>> User can read serialized bytes from KafkaIO and deserialize >>>>>>>> explicitly in a ParDo, which gives complete control on how to handle >>>>>>>> record >>>>>>>> errors. This is I would do if I need to in my pipeline. >>>>>>>> >>>>>>>> If there is a transform in Beam that does this, it could be >>>>>>>> convenient for users in many such scenarios. This is simpler than each >>>>>>>> source supporting it explicitly. >>>>>>>> >>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>> chamik...@google.com> wrote: >>>>>>>> >>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably >>>>>>>>> not something that can easily be supported. We might be able to >>>>>>>>> support >>>>>>>>> similar features when we have Kafka on top of Splittable DoFn though. >>>>>>>>> >>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Cham >>>>>>>>> >>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> This is a great question. I've added the dev list to be sure it >>>>>>>>>> gets noticed by whoever may know best. >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>> tobias.kay...@ricardo.ch> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that >>>>>>>>>>> uses a KafkaIO >>>>>>>>>>> connector for it's input? As >>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be able >>>>>>>>>>> to produce a >>>>>>>>>>> Deadletter output from it? >>>>>>>>>>> >>>>>>>>>>> I have the following pipeline defined that reads from a KafkaIO >>>>>>>>>>> input: >>>>>>>>>>> >>>>>>>>>>> pipeline.apply( >>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>> .withTopics(topics) >>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>> >>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>> >>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>> "earliest")) >>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>>>> "beam-consumers")) >>>>>>>>>>> >>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>> "true")) >>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>> new MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>> .withReadCommitted() >>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> and I like to get deadletter outputs when my timestamp >>>>>>>>>>> extraction fails. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Tobi >>>>>>>>>>> >>>>>>>>>>>