[
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16251363#comment-16251363
]
Juan Miguel Cejuela commented on FLINK-8046:
--------------------------------------------
Hi [~kkl0u] !
I also copy here my answer to you through the mailing list. Let's keep the
conversation here from now on ;)
---
Hi Kostas,
thank you very much for your answer.
Yes, I proposed the change in https://github.com/apache/flink/pull/4997 to
compare as modificationTime < globalModificationTime (without accepting
equals). Later, however, I realized, as you correctly point out, that this
creates duplicates.
The original and now deprecated FileMonitoringFunction.java indeed kept a map
of filenames to their timestamps.
That works. However, this memory consumption is likely too much for my
application, as I may process millions of files.
What I’ve done so far is to create my own
MyPatchedContinuousFileMonitoringFunction that has a similar map, however
implemented with a LinkedHashMap to limit the size of the map to a desired max
num of entries, as in:
private volatile Map<String, Boolean> filenamesAlreadySeen = new
LinkedHashMap<String, Boolean>() {
@Override
protected boolean removeEldestEntry(Map.Entry<String, Boolean> eldest) {
return size() > MAX_ENTRIES;
}
};
and then changed shouldIgnore to:
private boolean shouldIgnore(Path filePath, long modificationTime) {
assert (Thread.holdsLock(checkpointLock));
boolean alreadySeen =
filenamesAlreadySeen.containsKey(filePath.getName());
boolean shouldIgnore = alreadySeen || modificationTime <
globalModificationTime;
filenamesAlreadySeen.put(filePath.getName(), true);
if (shouldIgnore && LOG.isDebugEnabled()) {
LOG.debug("Ignoring " + filePath + ", with mod time= " +
modificationTime +
" and global mod time= " + globalModificationTime);
}
return shouldIgnore;
}
This is a partial solution that works now for me. However, it’s still a hack
and very particular solution.
I think the real solution would be also to use the accessTime (not only the
modificationTime). However, as I pointed out in the github pull request, as of
flink 1.3.2, access time is always 0, at least on my machine and local file
system (macOS).
> ContinuousFileMonitoringFunction wrongly ignores files with exact same
> timestamp
> --------------------------------------------------------------------------------
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.3.2
> Reporter: Juan Miguel Cejuela
> Labels: stream
> Fix For: 1.5.0
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable
> `globalModificationTime` to filter out files that are "older". However, the
> current test (to check "older") does
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method
> documentation also states "This happens if the modification time of the file
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact
> timestamp to be ignored. The behavior is also non-deterministic, as the first
> file to be accepted ("first" being pretty much random) makes the rest of
> files with same exact timestamp to be ignored.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)