[
https://issues.apache.org/jira/browse/FLINK-38970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18054194#comment-18054194
]
Roman Khachatryan commented on FLINK-38970:
-------------------------------------------
The problem seems to be caused by mainThreadExecutor implementation that's used
in test: it is a direct executor, meaning that
callbacks in `Execution#deploy` that are meant to be executed on the "main"
thread, are executed on the IO/future executor.
For example:
{code:java}
final CompletableFuture<TaskDeploymentDescriptor>
taskDeploymentDescriptorFuture =
CompletableFuture.supplyAsync(
initOffloadedTaskRestoreRef(
// Passing a copy of the task
restore from the
// main thread to the I/O executor
in order to
// avoid synchronization issues
taskRestore,
maybeOffloadedTaskRestoreCleanupRef),
executor)
// back to main thread because this accesses
execution graph
// internals
.thenComposeAsync(
tryGetTaskDeploymentDescriptorForSlot(slot),
jobMasterMainThreadExecutor); {code}
When I replace futureExecutor with `new DirectScheduledExecutorService()`, this
problem goes away.
Alternatively, if I waiting for the 1st future in the main thread also solves
the problem.
> AdaptiveBatchSchedulerTest is unstable
> --------------------------------------
>
> Key: FLINK-38970
> URL: https://issues.apache.org/jira/browse/FLINK-38970
> Project: Flink
> Issue Type: Bug
> Components: Tests
> Affects Versions: 2.3.0
> Reporter: Roman Khachatryan
> Priority: Major
>
> When running locally, testAdaptiveBatchScheduler fails 1270 out of 3000 runs
> with
> {code:java}
> org.opentest4j.AssertionFailedError:
> expected: 10
> but was: -1
> Expected :10
> Actual :-1
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTest.testAdaptiveBatchScheduler(AdaptiveBatchSchedulerTest.java:144)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:387)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
> {code}
> With logging enabled, I see that some operations are executed on the wrong
> thread:
> {code:java}
> 7083 [pool-5116-thread-1] WARN
> org.apache.flink.runtime.rpc.MainThreadValidatorUtil [] - Violation of main
> thread constraint detected: expected
> <Thread[#5153,ForkJoinPool-2558-worker-1,5,main]> but running in
> <Thread[#5154,pool-5116-thread-1,5,main]>.
> java.lang.Exception: Violation of main thread constraint detected: expected
> <Thread[#5153,ForkJoinPool-2558-worker-1,5,main]> but running in
> <Thread[#5154,pool-5116-thread-1,5,main]>.
> at
> org.apache.flink.runtime.rpc.MainThreadValidatorUtil.isRunningInExpectedThread(MainThreadValidatorUtil.java:73)
> ~[classes/:?]
> at
> org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter$1.execute(ComponentMainThreadExecutorServiceAdapter.java:66)
> ~[test-classes/:?]
> at
> org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:64)
> ~[classes/:?]
> at
> org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.execute(ComponentMainThreadExecutorServiceAdapter.java:128)
> ~[test-classes/:?]
> at
> java.base/java.util.concurrent.CompletableFuture$UniCompletion.claim(CompletableFuture.java:572)
> ~[?:?]
> at
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1147)
> ~[?:?]
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
> ~[?:?]
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1773)
> ~[?:?]
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
> ~[?:?]
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> ~[?:?]
> at
> java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
> ~[?:?]
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:?]
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> ~[?:?]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> ~[?:?]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ~[?:?]
> at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)