Hi all,

This is to open a discussion on how to better handle event-time in continuous 
file processing.

For the sake of illustration of the problem we will use the example of 
processing hourly server logs. 
In this case, each server writes its logs in hourly files, with names:


        1) we have two servers producing logs server-1 and server-2
        2) they have produced one file each, e.g. for 10am to 11am, so 
server-1-10, server-2-10
        3) our job has a parallelism of 2, so the 
ContinuousFileMonitoringFunction has parallelism 1 and the reader 2
        4) records within each file have timestamps in order, or moderately 
        5) each log file is split into two splits by the underlying filesystem, 
e.g. server-1-10-1 and server-1-10-2

In the scenario above, and in the current implementation of the continuous file 
processing, the monitoring function will:
        1) sort the files on ascending modification time, 
        2) compute the splits of each of the files and 
        3) forward the splits in order of the modification timestamp and their 
offset in the file to the downstream readers randomly. 

Given the above, reader-1 will take server-1-10-1, and reader-2, server-1-10-2.
Focusing on reader-1, as soon as it gets its split, it will start reading the 
contained elements and assign timestamps to them 
based on a user-specified timestamp extractor (this may happen later in the 
pipeline bit it does not break the generality of the problem).
In addition, given that we are operating in event time, the reader will also 
start emitting watermarks based on the timestamps 
assigned to the elements it has read. 

In this case, after processing server-1-10-1 and server-1-10-2 by the 2 
readers, the watermark will have advanced somewhere in the 
middle of the timestamps included in the file (files have logs for 10 to 11 

In this case, when the splits of file server-2-10 are to be processed, elements 
in the beginning of the file will be dropped as late.

Proposed Solution:

To face this, we could do the following:

1) Split the files (and their corresponding splits) in file-groups e.g. based 
on a user-specified parser of the filename.
2) Files/splits within the same file-group should be ordered so that 
server-1-10 is processed 
before server-1-11. This can be done through the same filename parser mentioned 
3) In each reader task, keep a watermark emitter/ timestamp extractor and a 
(candidate) watermark per file-group. The watermark 
emitted by each task should be the minimum across all its file-groups.

