elon_X created FLINK-35088:
------------------------------
Summary: watermark alignment maxAllowedWatermarkDrift and
updateInterval param need check
Key: FLINK-35088
URL: https://issues.apache.org/jira/browse/FLINK-35088
Project: Flink
Issue Type: Improvement
Components: API / Core, Runtime / Coordination
Affects Versions: 1.16.1
Reporter: elon_X
Attachments: image-2024-04-11-20-12-29-951.png
When I use watermark alignment,
1.I found that setting maxAllowedWatermarkDrift to a negative number initially
led me to believe it could support delaying the consumption of the source, so I
tried it. Then, the upstream data flow would hang indefinitely.
Root cause:
{code:java}
long maxAllowedWatermark = globalCombinedWatermark.getTimestamp()
+ watermarkAlignmentParams.getMaxAllowedWatermarkDrift(); {code}
If maxAllowedWatermarkDrift is negative, SourceOperator: maxAllowedWatermark <
lastEmittedWatermark, then the SourceReader will be blocked indefinitely and
cannot recover.
I'm not sure if this is a supported feature of watermark alignment. If it's
not, I think an additional parameter validation should be implemented to throw
an exception on the client side if the value is negative.
2.The updateInterval parameter also lacks validation. If I set it to 0, the
task will throw an exception when starting the job manager. The JDK class
java.util.concurrent.ScheduledThreadPoolExecutor performs the validation and
throws the exception.
{code:java}
java.lang.IllegalArgumentException: null
at
java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:565)
~[?:1.8.0_351]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinator.<init>(SourceCoordinator.java:191)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider.getCoordinator(SourceCoordinatorProvider.java:92)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.createNewInternalCoordinator(RecreateOnResetOperatorCoordinator.java:333)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:59)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.<init>(RecreateOnResetOperatorCoordinator.java:42)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:201)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$Provider.create(RecreateOnResetOperatorCoordinator.java:195)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:529)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:494)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:365)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-dist_2.12-1.16.1.jar:1.16.1]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
[?:1.8.0_351]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_351]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_351]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_351]{code}
Therefore, I believe it's necessary to validate these two parameters to ensure
that exceptions are thrown on the client side to alert the user.
!image-2024-04-11-20-12-29-951.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)