On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <lc...@google.com> wrote:

> You would have to return min timestamp for all records otherwise the
> watermark may have advanced and you would be outputting records that are
> droppably late.
>

That would be fine I guess. What’s the timestamp for a record that doesn’t
have one?


> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <rang...@google.com> wrote:
>
>> To be clear, returning min_timestamp for unparsable records shound not
>> affect the watermark.
>>
>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <rang...@google.com> wrote:
>>
>>> How about returning min_timestamp? The would be dropped or redirected by
>>> the ParDo after that.
>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is
>>> this pipeline defined under kafkaio package?
>>>
>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> In this case, the user is attempting to handle errors when parsing the
>>>> timestamp. The timestamp controls the watermark for the UnboundedSource,
>>>> how would they control the watermark in a downstream ParDo?
>>>>
>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com>
>>>> wrote:
>>>>
>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>>>>> function that would result in an exception,  this can be extracted by a
>>>>>> ParDo down the line.
>>>>>>
>>>>>
>>>>> KafkaIO does return bytes, and I think most sources should, unless
>>>>> there is a good reason not to.
>>>>> Given that, do we think Beam should provide a tranform that makes to
>>>>> simpler to handle deadletter output? I think there was a thread about it 
>>>>> in
>>>>> the past.
>>>>>
>>>>>
>>>>>>
>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <
>>>>>> jcgarc...@gmail.com> wrote:
>>>>>>
>>>>>>> As Raghu said,
>>>>>>>
>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert that
>>>>>>> you can extract your Success Records (TupleTag) and your DeadLetter
>>>>>>> records(TupleTag) and do whatever you want with them.
>>>>>>>
>>>>>>>
>>>>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018,
>>>>>>> 05:18:
>>>>>>>
>>>>>>>> User can read serialized bytes from KafkaIO and deserialize
>>>>>>>> explicitly in a ParDo, which gives complete control on how to handle 
>>>>>>>> record
>>>>>>>> errors. This is I would do if I need to in my pipeline.
>>>>>>>>
>>>>>>>> If there is a transform in Beam that does this, it could be
>>>>>>>> convenient for users in many such scenarios. This is simpler than each
>>>>>>>> source supporting it explicitly.
>>>>>>>>
>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably
>>>>>>>>> not something that can easily be supported. We might be able to 
>>>>>>>>> support
>>>>>>>>> similar features when we have Kafka on top of Splittable DoFn though.
>>>>>>>>>
>>>>>>>> So feel free to create a feature request JIRA for this.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Cham
>>>>>>>>>
>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> This is a great question. I've added the dev list to be sure it
>>>>>>>>>> gets noticed by whoever may know best.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that
>>>>>>>>>>> uses a KafkaIO
>>>>>>>>>>> connector for it's input? As
>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be able
>>>>>>>>>>> to produce a
>>>>>>>>>>> Deadletter output from it?
>>>>>>>>>>>
>>>>>>>>>>> I have the following pipeline defined that reads from a KafkaIO
>>>>>>>>>>> input:
>>>>>>>>>>>
>>>>>>>>>>> pipeline.apply(
>>>>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>>>>     .withTopics(topics)
>>>>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>>>>     .updateConsumerProperties(
>>>>>>>>>>>
>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>>>>> inputMessagesConfig))
>>>>>>>>>>>
>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", 
>>>>>>>>>>> "earliest"))
>>>>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>>>>> "beam-consumers"))
>>>>>>>>>>>
>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", 
>>>>>>>>>>> "true"))
>>>>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>>>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>>>>     .withReadCommitted()
>>>>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> and I like to get deadletter outputs when my timestamp
>>>>>>>>>>> extraction fails.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Tobi
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to