On Thu, Oct 25, 2018 at 10:02 AM Raghu Angadi <[email protected]> wrote:
> On Wed, Oct 24, 2018 at 11:54 PM Reuven Lax <[email protected]> wrote: > [...] > >> KafkaIO has a few in-built policies for watermark and timestamp that >>> cover most use cases (including server time, which has a benefit of >>> providing perfect watermark). It also gives fairly complete control on >>> these to users if they chose to. I think it looks like reasonable for a >>> policy to base its watermark only only on parsable records, and ignore >>> unparsable records w.r.t watermark calculation. >>> >> >> But then doesn't that force the user to set max allowed lateness to >> infinity, otherwise these records will be dropped? >> > > True, especially if there are any aggregations needed to be performed on > dead-letter output. I think I understand issue better now. Can it be stated > something like this: > > What we ideally want is two independent sources, one for the main source, > and one for the deadletter output, each with its own timestamp and > watermark characteristics. > > If so, yes, I don't think it is feasible with UnboundedSource API. How > does SDF enable this without special features from Beam? > Not sure if I understand why this would require Kafka to behave as two independent sources. Won't setting a non-negative-infinity timestamp (for example processing time) for failed records be enough ? Also (at least at some point) there were discussions on supporting SDFs to report different watermarks for different outputs. More details are available here: https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit - Cham > > Raghu. > >> >> It could even assign a timestamp that makes more logical sense in a >>> particular application. >>> >>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]> wrote: >>> >>>> Forgive me if this is naive or missing something, but here are my >>>> thoughts on these alternatives: >>>> >>>> (0) Timestamp has to be pulled out in the source to control the >>>> watermark. Luke's point is imortant. >>>> >>>> (1) If bad records get min_timestamp, and they occur infrequently >>>> enough, then watermark will advance and they will all be dropped. That will >>>> not allow output to a dead-letter queue. >>>> >>>> (2) If you have always min_timestamp records, or if bad records are >>>> frequent, the watermark will never advance. So windows/aggregations would >>>> never be considered complete. Triggers could be used to get output anyhow, >>>> but it would never be a final answer. I think it is not in the spirit of >>>> Beam to work this way. Pragmatically, no state could ever be freed by a >>>> runner. >>>> >>>> In SQL there is an actual "dead letter" option when creating a table >>>> that parses from a bytes source. If, for example, a JSON record cannot be >>>> parsed to the expected schema - like maybe an avro record got in the >>>> stream, or the JSON doesn't match the expected schema - it is output as-is >>>> to a user-specified dead letter queue. I think this same level of support >>>> is also required for records that cannot have timestamps extracted in an >>>> unbounded source. >>>> >>>> In an SDF I think the function has enough control to do it all in >>>> "userland", so Cham is right on here. >>>> >>>> Kenn >>>> >>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> That depends on the users pipeline and how watermark advancement of >>>>> the source may impact elements becoming droppably late if they are emitted >>>>> with the minimum timestamp. >>>>> >>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]> >>>>> wrote: >>>>> >>>>>> I see. >>>>>> >>>>>> What I meant was to return min_timestamp for bad records in the >>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for >>>>>> parsable records. That should work too, right? >>>>>> >>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> Yes, that would be fine. >>>>>>> >>>>>>> The user could then use a ParDo which outputs to a DLQ for things it >>>>>>> can't parse the timestamp for and use outputWithTimestamp[1] for >>>>>>> everything >>>>>>> else. >>>>>>> >>>>>>> 1: >>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant- >>>>>>> >>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks. So returning min timestamp is OK, right (assuming >>>>>>>> application fine is with what it means)? >>>>>>>> >>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> All records in Apache Beam have a timestamp. The default timestamp >>>>>>>>> is the min timestamp defined here: >>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> You would have to return min timestamp for all records otherwise >>>>>>>>>>> the watermark may have advanced and you would be outputting records >>>>>>>>>>> that >>>>>>>>>>> are droppably late. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> That would be fine I guess. What’s the timestamp for a record >>>>>>>>>> that doesn’t have one? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records >>>>>>>>>>>> shound not affect the watermark. >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or >>>>>>>>>>>>> redirected by the ParDo after that. >>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public >>>>>>>>>>>>> API, is this pipeline defined under kafkaio package? >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> In this case, the user is attempting to handle errors when >>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for >>>>>>>>>>>>>> the >>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a >>>>>>>>>>>>>> downstream ParDo? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of >>>>>>>>>>>>>>>> applying a function that would result in an exception, this >>>>>>>>>>>>>>>> can be >>>>>>>>>>>>>>>> extracted by a ParDo down the line. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should, >>>>>>>>>>>>>>> unless there is a good reason not to. >>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform that >>>>>>>>>>>>>>> makes to simpler to handle deadletter output? I think there was >>>>>>>>>>>>>>> a thread >>>>>>>>>>>>>>> about it in the past. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> As Raghu said, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple >>>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) >>>>>>>>>>>>>>>>> and your >>>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with >>>>>>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. >>>>>>>>>>>>>>>>> Okt. 2018, 05:18: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and >>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete >>>>>>>>>>>>>>>>>> control on how to >>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in my >>>>>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could >>>>>>>>>>>>>>>>>> be convenient for users in many such scenarios. This is >>>>>>>>>>>>>>>>>> simpler than each >>>>>>>>>>>>>>>>>> source supporting it explicitly. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this >>>>>>>>>>>>>>>>>>> is probably not something that can easily be supported. We >>>>>>>>>>>>>>>>>>> might be able to >>>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of >>>>>>>>>>>>>>>>>>> Splittable DoFn >>>>>>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>> Cham >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to be >>>>>>>>>>>>>>>>>>>> sure it gets noticed by whoever may know best. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a >>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO >>>>>>>>>>>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would >>>>>>>>>>>>>>>>>>>>> I be able to produce a >>>>>>>>>>>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from >>>>>>>>>>>>>>>>>>>>> a KafkaIO input: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>>>>>>>>>>> "earliest")) >>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of(" >>>>>>>>>>>>>>>>>>>>> group.id", "beam-consumers")) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>>>>>>>>>>> "true")) >>>>>>>>>>>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp >>>>>>>>>>>>>>>>>>>>> extraction fails. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>> Tobi >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>
