Hi Theodore,

Thanks again for your insight and help. I'd like to learn more about how we
got the timestamp from WindowedValue initially from [email protected]
<[email protected]>

-Chengzhi

On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu <[email protected]> wrote:

> Hi Chengzhi,
>
> I'm not completely sure where/how the timestamp is set for a
> ProcessContext object. Here is the error code found within the Apache Beam
> repo.
>
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
> which makes reference to `elem.getTimestamp()` where elem is a
> WindowedValue.
>
> I am thinking [email protected] <[email protected]> can offer some
> insight. Would be interested to find out more myself.
>
> -Theo
>
> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao <[email protected]>
> wrote:
>
>> Hi Theodore,
>>
>> Thanks for your reply. This is just a simple example that I tried to
>> understand how event time works in Beam. I could have more fields and I
>> would have an event time for each of record, so I tried to let Beam know
>> which filed is the event time to use for later windowing and computation.
>>
>> I think we you mentioned the probable reason sounds reasonable, I am
>> still trying to figure out in the error message "current input
>> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
>>
>> Thanks a lot for your help.
>>
>> -- Chengzhi
>>
>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <[email protected]> wrote:
>>
>>> Hi Chengzhi,
>>>
>>> Are you simply trying to emit the timestamp onward? Why not just use
>>> `out.output` with an PCollection<Instant>?
>>>
>>> static class ReadWithEventTime extends DoFn<String, String> {
>>>     @DoFn.ProcessElement
>>>     public void processElement(@Element String line, 
>>> OutputReceiver<Instant> out){
>>>         out.output(new Instant(Long.parseLong(line)));
>>>     }
>>> }
>>>
>>> You can also output the line itself as a PCollection<String>. If you
>>> line has additional information to parse, consider a KeyValue Pair
>>> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
>>>  where
>>> you can emit both some parsed context of the string and the timestamp.
>>>
>>> The probable reason why outputWithTimestamp doesn't work with older
>>> times is that the timestamp emitted is used specifically for windowing and
>>> for streaming type Data pipelines to determine which window each record
>>> belongs for aggregations.
>>>
>>> -Theo
>>>
>>>
>>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <[email protected]>
>>> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> I am new to Beam and try to play with some example, I am running Beam
>>>> 2.14 with Direct runner to read some files (I continue generated).
>>>>
>>>> I am facing this error: Cannot output with timestamp
>>>> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
>>>> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
>>>> skew (0 milliseconds). I searched online but still don't quite understand
>>>> it so I am asking here for some help.
>>>>
>>>> A file has some past timestamp in it:
>>>> 1565958615120
>>>> 1565958615120
>>>> 1565958615121
>>>>
>>>> My code looks something like this:
>>>>
>>>>    static class ReadWithEventTime extends DoFn<String, String> {
>>>>     @ProcessElement
>>>>     public void processElement(@Element String line, 
>>>> OutputReceiver<String> out){
>>>>         out.outputWithTimestamp(line, new Instant(Long.parseLong(line)));
>>>>     }
>>>> }
>>>>
>>>>     public static void main(String[] args) {
>>>>         PipelineOptions options = PipelineOptionsFactory.create();
>>>>         Pipeline pipeline = Pipeline.create(options);
>>>>
>>>>         String sourcePath = new File("files/").getPath();
>>>>
>>>>         PCollection<String> data = pipeline.apply("ReadData",
>>>>                 TextIO.read().from(sourcePath + "/test*")
>>>>                         .watchForNewFiles(Duration.standardSeconds(5), 
>>>> Watch.Growth.<String>never()));
>>>>
>>>>         data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime()));
>>>>
>>>>         pipeline.run().waitUntilFinish();
>>>>
>>>>     }
>>>>
>>>>
>>>> I am trying to understand in the error message where "current input
>>>> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark
>>>> when I start my application? If that's the case, is there a way that I can
>>>> change the initial watermark?
>>>>
>>>> Also, I can setup `withAllowedTimestampSkew` but it looks like it has
>>>> been deprecated.
>>>>
>>>> Any suggestion would be appreciated. Thank you!
>>>>
>>>> Best,
>>>> Chengzhi
>>>>
>>>>
>>>

Reply via email to