On Thu, Oct 25, 2018 at 10:02 AM Raghu Angadi <[email protected]> wrote:

> On Wed, Oct 24, 2018 at 11:54 PM Reuven Lax <[email protected]> wrote:
> [...]
>
>> KafkaIO has a few in-built policies for watermark and timestamp that
>>> cover most use cases (including server time, which has a benefit of
>>> providing perfect watermark). It also gives fairly complete control on
>>> these to users if they chose to. I think it looks like reasonable for a
>>> policy to base its watermark only only on parsable records, and ignore
>>> unparsable records w.r.t watermark calculation.
>>>
>>
>> But then doesn't that force the user to set max allowed lateness to
>> infinity, otherwise these records will be dropped?
>>
>
> True, especially if there are any aggregations needed to be performed on
> dead-letter output. I think I understand issue better now. Can it be stated
> something like this:
>
> What we ideally want is two independent sources, one for the main source,
> and one for the deadletter output, each with its own timestamp and
> watermark characteristics.
>
> If so, yes, I don't think it is feasible with UnboundedSource API. How
> does SDF enable this without special features from Beam?
>

Not sure if I understand why this would require Kafka to behave as two
independent sources. Won't setting a non-negative-infinity timestamp (for
example processing time) for failed records be enough ?

Also (at least at some point) there were discussions on supporting SDFs to
report different watermarks for different outputs. More details are
available here:
https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit

- Cham


>
> Raghu.
>
>>
>> It could even assign a timestamp that makes more logical sense in a
>>> particular application.
>>>
>>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]> wrote:
>>>
>>>> Forgive me if this is naive or missing something, but here are my
>>>> thoughts on these alternatives:
>>>>
>>>> (0) Timestamp has to be pulled out in the source to control the
>>>> watermark. Luke's point is imortant.
>>>>
>>>> (1) If bad records get min_timestamp, and they occur infrequently
>>>> enough, then watermark will advance and they will all be dropped. That will
>>>> not allow output to a dead-letter queue.
>>>>
>>>> (2) If you have always min_timestamp records, or if bad records are
>>>> frequent, the watermark will never advance. So windows/aggregations would
>>>> never be considered complete. Triggers could be used to get output anyhow,
>>>> but it would never be a final answer. I think it is not in the spirit of
>>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>>> runner.
>>>>
>>>> In SQL there is an actual "dead letter" option when creating a table
>>>> that parses from a bytes source. If, for example, a JSON record cannot be
>>>> parsed to the expected schema - like maybe an avro record got in the
>>>> stream, or the JSON doesn't match the expected schema - it is output as-is
>>>> to a user-specified dead letter queue. I think this same level of support
>>>> is also required for records that cannot have timestamps extracted in an
>>>> unbounded source.
>>>>
>>>> In an SDF I think the function has enough control to do it all in
>>>> "userland", so Cham is right on here.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> That depends on the users pipeline and how watermark advancement of
>>>>> the source may impact elements becoming droppably late if they are emitted
>>>>> with the minimum timestamp.
>>>>>
>>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I see.
>>>>>>
>>>>>> What I meant was to return min_timestamp for bad records in the
>>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>>>>> parsable records. That should work too, right?
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> Yes, that would be fine.
>>>>>>>
>>>>>>> The user could then use a ParDo which outputs to a DLQ for things it
>>>>>>> can't parse the timestamp for and use outputWithTimestamp[1] for 
>>>>>>> everything
>>>>>>> else.
>>>>>>>
>>>>>>> 1:
>>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks. So returning  min timestamp is OK, right (assuming
>>>>>>>> application fine is with what it means)?
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> All records in Apache Beam have a timestamp. The default timestamp
>>>>>>>>> is the min timestamp defined here:
>>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> You would have to return min timestamp for all records otherwise
>>>>>>>>>>> the watermark may have advanced and you would be outputting records 
>>>>>>>>>>> that
>>>>>>>>>>> are droppably late.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> That would be fine I guess. What’s the timestamp for a record
>>>>>>>>>> that doesn’t have one?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records
>>>>>>>>>>>> shound not affect the watermark.
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public
>>>>>>>>>>>>> API, is this pipeline defined under kafkaio package?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>>>>>> downstream ParDo?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of
>>>>>>>>>>>>>>>> applying a function that would result in an exception,  this 
>>>>>>>>>>>>>>>> can be
>>>>>>>>>>>>>>>> extracted by a ParDo down the line.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should,
>>>>>>>>>>>>>>> unless there is a good reason not to.
>>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform that
>>>>>>>>>>>>>>> makes to simpler to handle deadletter output? I think there was 
>>>>>>>>>>>>>>> a thread
>>>>>>>>>>>>>>> about it in the past.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple
>>>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) 
>>>>>>>>>>>>>>>>> and your
>>>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with 
>>>>>>>>>>>>>>>>> them.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24.
>>>>>>>>>>>>>>>>> Okt. 2018, 05:18:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and
>>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete 
>>>>>>>>>>>>>>>>>> control on how to
>>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in my 
>>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could
>>>>>>>>>>>>>>>>>> be convenient for users in many such scenarios. This is 
>>>>>>>>>>>>>>>>>> simpler than each
>>>>>>>>>>>>>>>>>> source supporting it explicitly.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this
>>>>>>>>>>>>>>>>>>> is probably not something that can easily be supported. We 
>>>>>>>>>>>>>>>>>>> might be able to
>>>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of 
>>>>>>>>>>>>>>>>>>> Splittable DoFn
>>>>>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to be
>>>>>>>>>>>>>>>>>>>> sure it gets noticed by whoever may know best.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a
>>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO
>>>>>>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would
>>>>>>>>>>>>>>>>>>>>> I be able to produce a
>>>>>>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from
>>>>>>>>>>>>>>>>>>>>> a KafkaIO input:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>>>>>>>>>>>>>>  "earliest"))
>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("
>>>>>>>>>>>>>>>>>>>>> group.id", "beam-consumers"))
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>>>>>>>>>>>> extraction fails.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>

Reply via email to