[
https://issues.apache.org/jira/browse/FLINK-29926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-29926:
-----------------------------------
Labels: 1.15 Flink ReadFile stale-critical (was: 1.15 Flink ReadFile)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Critical but is unassigned and neither itself nor its Sub-Tasks have been
updated for 14 days. I have gone ahead and marked it "stale-critical". If this
ticket is critical, please either assign yourself or give an update.
Afterwards, please remove the label or in 7 days the issue will be
deprioritized.
> File source continuous monitoring mode ignoring files during savepoint
> upgrade mode
> -----------------------------------------------------------------------------------
>
> Key: FLINK-29926
> URL: https://issues.apache.org/jira/browse/FLINK-29926
> Project: Flink
> Issue Type: Bug
> Reporter: Avinash
> Priority: Critical
> Labels: 1.15, Flink, ReadFile, stale-critical
>
> During a stateful application upgrade using flink kubernetes operator, the
> StreamExecutionEnvironment.readFile() with
> FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new
> changes that has happened on the same file in the directory.
>
> *Background* : Currently we have a fresh deployment of the application using
> kuberenetes operator using savepoint as the upgarde mode and checkpoint
> enabled.
> env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator
> starts continuosly monitoring the directory (S3 prefix) for any changes and
> also checkpoints for the provided duration.
> {noformat}
> 2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path:
> s3://test-app/configs
> ...
> ...
> 2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667817365000 and global mod time= 1667817365000
> ...
> ...
> 2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - ContinuousFileMonitoringFunction checkpointed 1667817365000.{noformat}
> Now we try to upgrade the application using the kubernetes operator, due to
> this the application tries to take savepoint by using the below Suspend
> Mechanism - Cancel with savepoint.
> By doing this, the application calls the cancel methods which inturn sets the
> globalModificationTime = Long.MAX_VALUE and then the savepoint is taken.
> {noformat}
> 2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667817365000 and global mod time= 1667817365000
> ...
> 2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - ContinuousFileMonitoringFunction checkpointed 9223372036854775807
> ....
> 2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Closed File Monitoring Source for path: s3://test-app/{noformat}
> Due to this, the globalModificationTime changed from 1667817365000 to
> MAX_VALUE (9223372036854775807) and gets stored in the savepoint state.
> Once the application restarts with the new changes, the env.readFile()
> operator restores the previous state in which the globalModificationTime =
> Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade
> {noformat}
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Restoring state for the ContinuousFileMonitoringFunction
> ....
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - ContinuousFileMonitoringFunction retrieved a global mod time of
> 9223372036854775807
> ....
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path:
> s3://test-app/configs
> ....
> 2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667821399000 and global mod time= 9223372036854775807
> ...
> ...
> 2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667821399000 and global mod time= 9223372036854775807
> ...
> 2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667821399000 and global mod time= 9223372036854775807{noformat}
> Cause : The above issue seems to be due the reassignment of the
> globalModificationTime to MAX_VALUE during cancel
> [https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389]
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)