On Thu, Oct 25, 2018 at 10:47 AM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Thu, Oct 25, 2018 at 10:41 AM Raghu Angadi <[email protected]> wrote:
>
>>
>> On Thu, Oct 25, 2018 at 10:28 AM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> Not sure if I understand why this would require Kafka to behave as two
>>> independent sources.
>>>
>>
>>
>>
>>> Won't setting a non-negative-infinity timestamp (for example processing
>>> time) for failed records be enough ?
>>>
>>
>> Well that's what I suggested and that is what a user seems to have done.
>> The question what if that is not what we want and don't want to mix these
>> two together (at least from my reading of Luke's and Kenn's comments, which
>> could be off).
>>
>
> Sorry I meant setting it at the SDF itself (not from a ParDo) when we have
> SDF-based KafkaIO.
>

You can set the timestamp even now, it does not need KafkaIO to be ported
to SDF. It was always possible to do that in KafkaIO. See Jozef Vilcek's
message above.


>
>
>>
>>
>
>>
>>>
>>> Also (at least at some point) there were discussions on supporting SDFs
>>> to report different watermarks for different outputs. More details are
>>> available here:
>>> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit
>>>
>>> - Cham
>>>
>>>
>>>>
>>>> Raghu.
>>>>
>>>>>
>>>>> It could even assign a timestamp that makes more logical sense in a
>>>>>> particular application.
>>>>>>
>>>>>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Forgive me if this is naive or missing something, but here are my
>>>>>>> thoughts on these alternatives:
>>>>>>>
>>>>>>> (0) Timestamp has to be pulled out in the source to control the
>>>>>>> watermark. Luke's point is imortant.
>>>>>>>
>>>>>>> (1) If bad records get min_timestamp, and they occur infrequently
>>>>>>> enough, then watermark will advance and they will all be dropped. That 
>>>>>>> will
>>>>>>> not allow output to a dead-letter queue.
>>>>>>>
>>>>>>> (2) If you have always min_timestamp records, or if bad records are
>>>>>>> frequent, the watermark will never advance. So windows/aggregations 
>>>>>>> would
>>>>>>> never be considered complete. Triggers could be used to get output 
>>>>>>> anyhow,
>>>>>>> but it would never be a final answer. I think it is not in the spirit of
>>>>>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>>>>>> runner.
>>>>>>>
>>>>>>> In SQL there is an actual "dead letter" option when creating a table
>>>>>>> that parses from a bytes source. If, for example, a JSON record cannot 
>>>>>>> be
>>>>>>> parsed to the expected schema - like maybe an avro record got in the
>>>>>>> stream, or the JSON doesn't match the expected schema - it is output 
>>>>>>> as-is
>>>>>>> to a user-specified dead letter queue. I think this same level of 
>>>>>>> support
>>>>>>> is also required for records that cannot have timestamps extracted in an
>>>>>>> unbounded source.
>>>>>>>
>>>>>>> In an SDF I think the function has enough control to do it all in
>>>>>>> "userland", so Cham is right on here.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That depends on the users pipeline and how watermark advancement of
>>>>>>>> the source may impact elements becoming droppably late if they are 
>>>>>>>> emitted
>>>>>>>> with the minimum timestamp.
>>>>>>>>
>>>>>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I see.
>>>>>>>>>
>>>>>>>>> What I meant was to return min_timestamp for bad records in the
>>>>>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for
>>>>>>>>> parsable records. That should work too, right?
>>>>>>>>>
>>>>>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yes, that would be fine.
>>>>>>>>>>
>>>>>>>>>> The user could then use a ParDo which outputs to a DLQ for things
>>>>>>>>>> it can't parse the timestamp for and use outputWithTimestamp[1] for
>>>>>>>>>> everything else.
>>>>>>>>>>
>>>>>>>>>> 1:
>>>>>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks. So returning  min timestamp is OK, right (assuming
>>>>>>>>>>> application fine is with what it means)?
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> All records in Apache Beam have a timestamp. The default
>>>>>>>>>>>> timestamp is the min timestamp defined here:
>>>>>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> You would have to return min timestamp for all records
>>>>>>>>>>>>>> otherwise the watermark may have advanced and you would be 
>>>>>>>>>>>>>> outputting
>>>>>>>>>>>>>> records that are droppably late.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> That would be fine I guess. What’s the timestamp for a record
>>>>>>>>>>>>> that doesn’t have one?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records
>>>>>>>>>>>>>>> shound not affect the watermark.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or
>>>>>>>>>>>>>>>> redirected by the ParDo after that.
>>>>>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a
>>>>>>>>>>>>>>>> public API, is this pipeline defined under kafkaio package?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> In this case, the user is attempting to handle errors when
>>>>>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark 
>>>>>>>>>>>>>>>>> for the
>>>>>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a 
>>>>>>>>>>>>>>>>> downstream ParDo?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of
>>>>>>>>>>>>>>>>>>> applying a function that would result in an exception,  
>>>>>>>>>>>>>>>>>>> this can be
>>>>>>>>>>>>>>>>>>> extracted by a ParDo down the line.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources
>>>>>>>>>>>>>>>>>> should, unless there is a good reason not to.
>>>>>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform
>>>>>>>>>>>>>>>>>> that makes to simpler to handle deadletter output? I think 
>>>>>>>>>>>>>>>>>> there was a
>>>>>>>>>>>>>>>>>> thread about it in the past.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> As Raghu said,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a
>>>>>>>>>>>>>>>>>>>> PCollectionTuple afert that you can extract your Success 
>>>>>>>>>>>>>>>>>>>> Records (TupleTag)
>>>>>>>>>>>>>>>>>>>> and your DeadLetter records(TupleTag) and do whatever you 
>>>>>>>>>>>>>>>>>>>> want with them.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24.
>>>>>>>>>>>>>>>>>>>> Okt. 2018, 05:18:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and
>>>>>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete 
>>>>>>>>>>>>>>>>>>>>> control on how to
>>>>>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in 
>>>>>>>>>>>>>>>>>>>>> my pipeline.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it
>>>>>>>>>>>>>>>>>>>>> could be convenient for users in many such scenarios. 
>>>>>>>>>>>>>>>>>>>>> This is simpler than
>>>>>>>>>>>>>>>>>>>>> each source supporting it explicitly.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework,
>>>>>>>>>>>>>>>>>>>>>> this is probably not something that can easily be 
>>>>>>>>>>>>>>>>>>>>>> supported. We might be
>>>>>>>>>>>>>>>>>>>>>> able to support similar features when we have Kafka on 
>>>>>>>>>>>>>>>>>>>>>> top of Splittable
>>>>>>>>>>>>>>>>>>>>>> DoFn though.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <
>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to
>>>>>>>>>>>>>>>>>>>>>>> be sure it gets noticed by whoever may know best.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a
>>>>>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO
>>>>>>>>>>>>>>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how
>>>>>>>>>>>>>>>>>>>>>>>> would I be able to produce a
>>>>>>>>>>>>>>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads
>>>>>>>>>>>>>>>>>>>>>>>> from a KafkaIO input:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>>>>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>>>>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>>>>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>>>>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>>>>>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>>>>>>>>>>>>>>>>>>  "earliest"))
>>>>>>>>>>>>>>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("
>>>>>>>>>>>>>>>>>>>>>>>> group.id", "beam-consumers"))
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>>>>>>>>>>>>>>>>>>  "true"))
>>>>>>>>>>>>>>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>>>>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>>>>>>>>>>>>>>             new
>>>>>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>>>>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>>>>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my
>>>>>>>>>>>>>>>>>>>>>>>> timestamp extraction fails.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>> Tobi
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>

Reply via email to