Hi Robert,

Thanks for your information, that explains the behavior I noticed. I guess
my current solution would be somehow to shift the watermark or start the
streaming process before any files come in to settle down the initial
watermark.

I will keep watching the JIRA you shared, thanks for the insights.

-Chengzhi

On Tue, Aug 20, 2019 at 4:53 PM Robert Bradshaw <[email protected]> wrote:

> The original timestamps are probably being assigned in the
> watchForNewFiles transform, which is also setting the watermark:
>
>
> https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668
>
> Until https://issues.apache.org/jira/browse/BEAM-644 is resolved, it
> probably makes sense to be able to customize the lag here.
>
> On Fri, Aug 16, 2019 at 6:44 PM Chengzhi Zhao <[email protected]>
> wrote:
> >
> > Hi Theodore,
> >
> > Thanks again for your insight and help. I'd like to learn more about how
> we got the timestamp from WindowedValue initially from +
> [email protected]
> >
> > -Chengzhi
> >
> > On Fri, Aug 16, 2019 at 7:41 PM Theodore Siu <[email protected]> wrote:
> >>
> >> Hi Chengzhi,
> >>
> >> I'm not completely sure where/how the timestamp is set for a
> ProcessContext object. Here is the error code found within the Apache Beam
> repo.
> >>
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
> >> which makes reference to `elem.getTimestamp()` where elem is a
> WindowedValue.
> >>
> >> I am thinking [email protected] can offer some insight. Would be
> interested to find out more myself.
> >>
> >> -Theo
> >>
> >> On Fri, Aug 16, 2019 at 3:04 PM Chengzhi Zhao <[email protected]>
> wrote:
> >>>
> >>> Hi Theodore,
> >>>
> >>> Thanks for your reply. This is just a simple example that I tried to
> understand how event time works in Beam. I could have more fields and I
> would have an event time for each of record, so I tried to let Beam know
> which filed is the event time to use for later windowing and computation.
> >>>
> >>> I think we you mentioned the probable reason sounds reasonable, I am
> still trying to figure out in the error message "current input
> (2019-08-16T12:39:06.887Z)" is coming from if you have any insight on it.
> >>>
> >>> Thanks a lot for your help.
> >>>
> >>> -- Chengzhi
> >>>
> >>> On Fri, Aug 16, 2019 at 9:57 AM Theodore Siu <[email protected]>
> wrote:
> >>>>
> >>>> Hi Chengzhi,
> >>>>
> >>>> Are you simply trying to emit the timestamp onward? Why not just use
> `out.output` with an PCollection<Instant>?
> >>>>
> >>>> static class ReadWithEventTime extends DoFn<String, String> {
> >>>>     @DoFn.ProcessElement
> >>>>     public void processElement(@Element String line,
> OutputReceiver<Instant> out){
> >>>>         out.output(new Instant(Long.parseLong(line)));
> >>>>     }
> >>>> }
> >>>>
> >>>> You can also output the line itself as a PCollection<String>. If you
> line has additional information to parse, consider a KeyValue Pair
> https://beam.apache.org/releases/javadoc/2.2.0/index.html?org/apache/beam/sdk/values/KV.html
> where you can emit both some parsed context of the string and the timestamp.
> >>>>
> >>>> The probable reason why outputWithTimestamp doesn't work with older
> times is that the timestamp emitted is used specifically for windowing and
> for streaming type Data pipelines to determine which window each record
> belongs for aggregations.
> >>>>
> >>>> -Theo
> >>>>
> >>>>
> >>>> On Fri, Aug 16, 2019 at 8:52 AM Chengzhi Zhao <
> [email protected]> wrote:
> >>>>>
> >>>>> Hi folks,
> >>>>>
> >>>>> I am new to Beam and try to play with some example, I am running
> Beam 2.14 with Direct runner to read some files (I continue generated).
> >>>>>
> >>>>> I am facing this error: Cannot output with timestamp
> 2019-08-16T12:30:15.120Z. Output timestamps must be no earlier than the
> timestamp of the current input (2019-08-16T12:39:06.887Z) minus the allowed
> skew (0 milliseconds). I searched online but still don't quite understand
> it so I am asking here for some help.
> >>>>>
> >>>>> A file has some past timestamp in it:
> >>>>> 1565958615120
> >>>>> 1565958615120
> >>>>> 1565958615121
> >>>>>
> >>>>> My code looks something like this:
> >>>>>
> >>>>>    static class ReadWithEventTime extends DoFn<String, String> {
> >>>>>     @ProcessElement
> >>>>>     public void processElement(@Element String line,
> OutputReceiver<String> out){
> >>>>>         out.outputWithTimestamp(line, new
> Instant(Long.parseLong(line)));
> >>>>>     }
> >>>>> }
> >>>>>
> >>>>>     public static void main(String[] args) {
> >>>>>         PipelineOptions options = PipelineOptionsFactory.create();
> >>>>>         Pipeline pipeline = Pipeline.create(options);
> >>>>>
> >>>>>         String sourcePath = new File("files/").getPath();
> >>>>>
> >>>>>         PCollection<String> data = pipeline.apply("ReadData",
> >>>>>                 TextIO.read().from(sourcePath + "/test*")
> >>>>>
>  .watchForNewFiles(Duration.standardSeconds(5),
> Watch.Growth.<String>never()));
> >>>>>
> >>>>>         data.apply("ReadWithEventTime", ParDo.of(new
> ReadWithEventTime()));
> >>>>>
> >>>>>         pipeline.run().waitUntilFinish();
> >>>>>
> >>>>>     }
> >>>>>
> >>>>>
> >>>>> I am trying to understand in the error message where "current input
> (2019-08-16T12:39:06.887Z)" is comming from. Is it the lowest watermark
> when I start my application? If that's the case, is there a way that I can
> change the initial watermark?
> >>>>>
> >>>>> Also, I can setup `withAllowedTimestampSkew` but it looks like it
> has been deprecated.
> >>>>>
> >>>>> Any suggestion would be appreciated. Thank you!
> >>>>>
> >>>>> Best,
> >>>>> Chengzhi
> >>>>>
>

Reply via email to