[
https://issues.apache.org/jira/browse/FLINK-28274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561034#comment-17561034
]
zl commented on FLINK-28274:
----------------------------
---The fix is either ensuring that maxParallelism for the monitoring function
is set correctly, or somehow telling the adaptive scheduler to not change the
parallelism of that function (in a generic way)
I prefer to set the max parallelism for the monitoring function correctly.
Since ContinuousFileMonitoringFunction is a legacy source function, we'd better
not change the adaptive scheduler forĀ this issue.
[AdaptiveScheduler.java#L344|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java#L344]
will calculate the maxParallelism for vertex only if no max parallelism was
configured, so if we can ensure that maxParallelism for the monitoring function
is 1, then reactive mode would not change the parallelism of the
`ContinuousFileMonitoringFunction`.
Since `ContinuousFileMonitoringFunction` is only used in
[StreamExecutionEnvironment.java#L1855|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1855]
, It's safe to explicitly set the max parallelism here.
> ContinuousFileMonitoringFunction doesn't work with reactive mode
> ----------------------------------------------------------------
>
> Key: FLINK-28274
> URL: https://issues.apache.org/jira/browse/FLINK-28274
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, Runtime / Coordination
> Affects Versions: 1.16.0
> Reporter: Robert Metzger
> Priority: Major
>
> This issue was first reported in the Flink Slack:
> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1656257678477659
> It seems that reactive mode is changing the parallelism of the
> `ContinuousFileMonitoringFunction`, which is supposed to always run with a
> parallelism of 1.
> This is the error
> {code}
> INITIALIZING to FAILED with failure cause:
> java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction
> retrieved invalid state.
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> {code}
> You can see from the logs that the parallelism is changing on a rescale event:
> {code}
> 2022-06-27 13:38:54,979 | INFO | .executiongraph.ExecutionGraph | Source:
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from
> RUNNING to CANCELING.
> 2022-06-27 13:38:55,254 | INFO | .executiongraph.ExecutionGraph | Source:
> Custom File Source (1/1) (cbaad20beee908b95c9fe5c34ba76bfa) switched from
> CANCELING to CANCELED.
> 2022-06-27 13:38:55,657 | INFO | .executiongraph.ExecutionGraph | Source:
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:38:55,722 | INFO | .executiongraph.ExecutionGraph | Source:
> Custom File Source (2/2) (6ceaacbe8d9aa507b0a56c850082da8c) switched from
> INITIALIZING to RUNNING.
> 2022-06-27 13:44:54,058 | INFO | .executiongraph.ExecutionGraph | Source:
> Custom File Source (1/2) (665b12194741744d6bba4408a252fa45) switched from
> RUNNING to CANCELING.
> 2022-06-27 13:45:00,825 | INFO | .executiongraph.ExecutionGraph | Source:
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:00,826 | INFO | .executiongraph.ExecutionGraph | Source:
> Custom File Source (1/2) (3cc408fd0eb9ddfa97b22f4dfc09d8dc) switched from
> INITIALIZING to RUNNING.
> 2022-06-27 13:45:01,434 | INFO | .executiongraph.ExecutionGraph | Source:
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from
> DEPLOYING to INITIALIZING.
> 2022-06-27 13:45:02,427 | INFO | .executiongraph.ExecutionGraph | Source:
> Custom File Source (2/2) (79338042d84b6458c34760bc85145512) switched from
> INITIALIZING to RUNNING.
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)