This is to open a discussion on how to better handle event-time in continuous
For the sake of illustration of the problem we will use the example of
processing hourly server logs.
In this case, each server writes its logs in hourly files, with names:
1) we have two servers producing logs server-1 and server-2
2) they have produced one file each, e.g. for 10am to 11am, so
3) our job has a parallelism of 2, so the
ContinuousFileMonitoringFunction has parallelism 1 and the reader 2
4) records within each file have timestamps in order, or moderately
5) each log file is split into two splits by the underlying filesystem,
e.g. server-1-10-1 and server-1-10-2
In the scenario above, and in the current implementation of the continuous file
processing, the monitoring function will:
1) sort the files on ascending modification time,
2) compute the splits of each of the files and
3) forward the splits in order of the modification timestamp and their
offset in the file to the downstream readers randomly.
Given the above, reader-1 will take server-1-10-1, and reader-2, server-1-10-2.
Focusing on reader-1, as soon as it gets its split, it will start reading the
contained elements and assign timestamps to them
based on a user-specified timestamp extractor (this may happen later in the
pipeline bit it does not break the generality of the problem).
In addition, given that we are operating in event time, the reader will also
start emitting watermarks based on the timestamps
assigned to the elements it has read.
In this case, after processing server-1-10-1 and server-1-10-2 by the 2
readers, the watermark will have advanced somewhere in the
middle of the timestamps included in the file (files have logs for 10 to 11
In this case, when the splits of file server-2-10 are to be processed, elements
in the beginning of the file will be dropped as late.
To face this, we could do the following:
1) Split the files (and their corresponding splits) in file-groups e.g. based
on a user-specified parser of the filename.
2) Files/splits within the same file-group should be ordered so that
server-1-10 is processed
before server-1-11. This can be done through the same filename parser mentioned
3) In each reader task, keep a watermark emitter/ timestamp extractor and a
(candidate) watermark per file-group. The watermark
emitted by each task should be the minimum across all its file-groups.