[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251363#comment-16251363
 ] 

Juan Miguel Cejuela commented on FLINK-8046:
--------------------------------------------

Hi [~kkl0u] !

I also copy here my answer to you through the mailing list. Let's keep the 
conversation here from now on ;)

---

Hi Kostas,

thank you very much for your answer.

Yes, I proposed the change in https://github.com/apache/flink/pull/4997 to 
compare as modificationTime < globalModificationTime (without accepting 
equals). Later, however, I realized, as you correctly point out, that this 
creates duplicates.

The original and now deprecated FileMonitoringFunction.java indeed kept a map 
of filenames to their timestamps.

That works. However, this memory consumption is likely too much for my 
application, as I may process millions of files.

What I’ve done so far is to create my own 
MyPatchedContinuousFileMonitoringFunction that has a similar map, however 
implemented with a LinkedHashMap to limit the size of the map to a desired max 
num of entries, as in:

private volatile Map<String, Boolean> filenamesAlreadySeen = new 
LinkedHashMap<String, Boolean>() {

        @Override
        protected boolean removeEldestEntry(Map.Entry<String, Boolean> eldest) {
            return size() > MAX_ENTRIES;
        }
    };

and then changed shouldIgnore to:

private boolean shouldIgnore(Path filePath, long modificationTime) {
        assert (Thread.holdsLock(checkpointLock));
        boolean alreadySeen = 
filenamesAlreadySeen.containsKey(filePath.getName());
        boolean shouldIgnore = alreadySeen || modificationTime < 
globalModificationTime;
        filenamesAlreadySeen.put(filePath.getName(), true);

        if (shouldIgnore && LOG.isDebugEnabled()) {
            LOG.debug("Ignoring " + filePath + ", with mod time= " + 
modificationTime +
                " and global mod time= " + globalModificationTime);
        }
        return shouldIgnore;
    }

This is a partial solution that works now for me. However, it’s still a hack 
and very particular solution.

I think the real solution would be also to use the accessTime (not only the 
modificationTime). However, as I pointed out in the github pull request, as of 
flink 1.3.2, access time is always 0, at least on my machine and local file 
system (macOS).

> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-8046
>                 URL: https://issues.apache.org/jira/browse/FLINK-8046
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.2
>            Reporter: Juan Miguel Cejuela
>              Labels: stream
>             Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to