Hi Robert, Thanks for your information, that explains the behavior I noticed. I guess my current solution would be somehow to shift the watermark or start the streaming process before any files come in to settle down the initial watermark.
I will keep watching the JIRA you shared, thanks for the insights. -Chengzhi On Tue, Aug 20, 2019 at 4:53 PM Robert Bradshaw <[email protected]> wrote: > The original timestamps are probably being assigned in the > watchForNewFiles transform, which is also setting the watermark: > > > https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668 > > Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it > probably makes sense to be able to customize the lag here. > > On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao <[email protected]> > wrote: > > > > Hi Theodore, > > > > Thanks again for your insight and help. I'd like to learn more about how > we got the timestamp from WindowedValue initially from + > [email protected] > > > > -Chengzhi > > > > On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu <[email protected]> wrote: > >> > >> Hi Chengzhi, > >> > >> I'm not completely sure where/how the timestamp is set for a > ProcessContext object. Here is the error code found within the Apache Beam > repo. > >> > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java > >> which makes reference to `elem.getTimestamp()` where elem is a > WindowedValue. > >> > >> I am thinking [email protected] can offer some insight. Would be > interested to find out more myself. > >> > >> -Theo > >> > >> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao <[email protected]> > wrote: > >>> > >>> Hi Theodore, > >>> > >>> Thanks for your reply. This is just a simple example that I tried to > understand how event time works in Beam. I could have more fields and I > would have an event time for each of record, so I tried to let Beam know > which filed is the event time to use for later windowing and computation. > >>> > >>> I think we you mentioned the probable reason sounds reasonable, I am > still trying to figure out in the error message "current input > (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it. > >>> > >>> Thanks a lot for your help. > >>> > >>> -- Chengzhi > >>> > >>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <[email protected]> > wrote: > >>>> > >>>> Hi Chengzhi, > >>>> > >>>> Are you simply trying to emit the timestamp onward? Why not just use > `out.output` with an PCollection<Instant>? > >>>> > >>>> static class ReadWithEventTime extends DoFn<String, String> { > >>>> @DoFn.ProcessElement > >>>> public void processElement(@Element String line, > OutputReceiver<Instant> out){ > >>>> out.output(new Instant(Long.parseLong(line))); > >>>> } > >>>> } > >>>> > >>>> You can also output the line itself as a PCollection<String>. If you > line has additional information to parse, consider a KeyValue Pair > https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html > where you can emit both some parsed context of the string and the timestamp. > >>>> > >>>> The probable reason why outputWithTimestamp doesn't work with older > times is that the timestamp emitted is used specifically for windowing and > for streaming type Data pipelines to determine which window each record > belongs for aggregations. > >>>> > >>>> -Theo > >>>> > >>>> > >>>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao < > [email protected]> wrote: > >>>>> > >>>>> Hi folks, > >>>>> > >>>>> I am new to Beam and try to play with some example, I am running > Beam 2.14 with Direct runner to read some files (I continue generated). > >>>>> > >>>>> I am facing this error: Cannot output with timestamp > 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the > timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed > skew (0 milliseconds). I searched online but still don't quite understand > it so I am asking here for some help. > >>>>> > >>>>> A file has some past timestamp in it: > >>>>> 1565958615120 > >>>>> 1565958615120 > >>>>> 1565958615121 > >>>>> > >>>>> My code looks something like this: > >>>>> > >>>>> static class ReadWithEventTime extends DoFn<String, String> { > >>>>> @ProcessElement > >>>>> public void processElement(@Element String line, > OutputReceiver<String> out){ > >>>>> out.outputWithTimestamp(line, new > Instant(Long.parseLong(line))); > >>>>> } > >>>>> } > >>>>> > >>>>> public static void main(String[] args) { > >>>>> PipelineOptions options = PipelineOptionsFactory.create(); > >>>>> Pipeline pipeline = Pipeline.create(options); > >>>>> > >>>>> String sourcePath = new File("files/").getPath(); > >>>>> > >>>>> PCollection<String> data = pipeline.apply("ReadData", > >>>>> TextIO.read().from(sourcePath + "/test*") > >>>>> > .watchForNewFiles(Duration.standardSeconds(5), > Watch.Growth.<String>never())); > >>>>> > >>>>> data.apply("ReadWithEventTime", ParDo.of(new > ReadWithEventTime())); > >>>>> > >>>>> pipeline.run().waitUntilFinish(); > >>>>> > >>>>> } > >>>>> > >>>>> > >>>>> I am trying to understand in the error message where "current input > (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark > when I start my application? If that's the case, is there a way that I can > change the initial watermark? > >>>>> > >>>>> Also, I can setup `withAllowedTimestampSkew` but it looks like it > has been deprecated. > >>>>> > >>>>> Any suggestion would be appreciated. Thank you! > >>>>> > >>>>> Best, > >>>>> Chengzhi > >>>>> >
