Hi Theodore, Thanks again for your insight and help. I'd like to learn more about how we got the timestamp from WindowedValue initially from [email protected] <[email protected]>
-Chengzhi On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu <[email protected]> wrote: > Hi Chengzhi, > > I'm not completely sure where/how the timestamp is set for a > ProcessContext object. Here is the error code found within the Apache Beam > repo. > > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java > which makes reference to `elem.getTimestamp()` where elem is a > WindowedValue. > > I am thinking [email protected] <[email protected]> can offer some > insight. Would be interested to find out more myself. > > -Theo > > On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao <[email protected]> > wrote: > >> Hi Theodore, >> >> Thanks for your reply. This is just a simple example that I tried to >> understand how event time works in Beam. I could have more fields and I >> would have an event time for each of record, so I tried to let Beam know >> which filed is the event time to use for later windowing and computation. >> >> I think we you mentioned the probable reason sounds reasonable, I am >> still trying to figure out in the error message "current input >> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it. >> >> Thanks a lot for your help. >> >> -- Chengzhi >> >> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <[email protected]> wrote: >> >>> Hi Chengzhi, >>> >>> Are you simply trying to emit the timestamp onward? Why not just use >>> `out.output` with an PCollection<Instant>? >>> >>> static class ReadWithEventTime extends DoFn<String, String> { >>> @DoFn.ProcessElement >>> public void processElement(@Element String line, >>> OutputReceiver<Instant> out){ >>> out.output(new Instant(Long.parseLong(line))); >>> } >>> } >>> >>> You can also output the line itself as a PCollection<String>. If you >>> line has additional information to parse, consider a KeyValue Pair >>> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html >>> where >>> you can emit both some parsed context of the string and the timestamp. >>> >>> The probable reason why outputWithTimestamp doesn't work with older >>> times is that the timestamp emitted is used specifically for windowing and >>> for streaming type Data pipelines to determine which window each record >>> belongs for aggregations. >>> >>> -Theo >>> >>> >>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <[email protected]> >>> wrote: >>> >>>> Hi folks, >>>> >>>> I am new to Beam and try to play with some example, I am running Beam >>>> 2.14 with Direct runner to read some files (I continue generated). >>>> >>>> I am facing this error: Cannot output with timestamp >>>> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the >>>> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed >>>> skew (0 milliseconds). I searched online but still don't quite understand >>>> it so I am asking here for some help. >>>> >>>> A file has some past timestamp in it: >>>> 1565958615120 >>>> 1565958615120 >>>> 1565958615121 >>>> >>>> My code looks something like this: >>>> >>>> static class ReadWithEventTime extends DoFn<String, String> { >>>> @ProcessElement >>>> public void processElement(@Element String line, >>>> OutputReceiver<String> out){ >>>> out.outputWithTimestamp(line, new Instant(Long.parseLong(line))); >>>> } >>>> } >>>> >>>> public static void main(String[] args) { >>>> PipelineOptions options = PipelineOptionsFactory.create(); >>>> Pipeline pipeline = Pipeline.create(options); >>>> >>>> String sourcePath = new File("files/").getPath(); >>>> >>>> PCollection<String> data = pipeline.apply("ReadData", >>>> TextIO.read().from(sourcePath + "/test*") >>>> .watchForNewFiles(Duration.standardSeconds(5), >>>> Watch.Growth.<String>never())); >>>> >>>> data.apply("ReadWithEventTime", ParDo.of(new ReadWithEventTime())); >>>> >>>> pipeline.run().waitUntilFinish(); >>>> >>>> } >>>> >>>> >>>> I am trying to understand in the error message where "current input >>>> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark >>>> when I start my application? If that's the case, is there a way that I can >>>> change the initial watermark? >>>> >>>> Also, I can setup `withAllowedTimestampSkew` but it looks like it has >>>> been deprecated. >>>> >>>> Any suggestion would be appreciated. Thank you! >>>> >>>> Best, >>>> Chengzhi >>>> >>>> >>>
