[
https://issues.apache.org/jira/browse/FLINK-31168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748808#comment-17748808
]
Matthias Pohl commented on FLINK-31168:
---------------------------------------
the suspicion is now that JDK17 prevents the test from destroying the process:
{code}
01:09:49,250 [ main] ERROR
org.apache.flink.runtime.testutils.TestJvmProcess [] - Failed to
forcibly destroy process
java.lang.reflect.InaccessibleObjectException: Unable to make public
java.lang.Process java.lang.ProcessImpl.destroyForcibly() accessible: module
java.base does not "opens java.lang" to unnamed module @19dc67c2
at
java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
~[?:?]
at
java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
~[?:?]
at java.lang.reflect.Method.checkCanSetAccessible(Method.java:199)
~[?:?]
at java.lang.reflect.Method.setAccessible(Method.java:193) ~[?:?]
at
org.apache.flink.runtime.testutils.TestJvmProcess.destroy(TestJvmProcess.java:214)
~[flink-runtime-1.18-SNAPSHOT-tests.jar:1.18-SNAPSHOT]
at
org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.testDispatcherProcessFailure(JobManagerHAProcessFailureRecoveryITCase.java:367)
~[test-classes/:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:?]
[...]
{code}
The {{TestJvmProcess}} class actually tries to destroy the process forcefully
first through reflection (see
[apache/flink:org.apache.flink.runtime.testutils.TestJvmProcess:214ff|https://github.com/apache/flink/blob/2940c02c986e3d70708187091bf006806bb90dff/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java#L214])
and calls {{destroy}} afterwards if the forced approach didn't work without
waiting for the process to finish. That could mean that the process gets
destroyed eventually but has enough time to finish the job in the first run
(i.e. cleaning the jobgraph up at the end) before the second JM process is
started.
Two interesting observations, though:
* Running the test under JDK17 locally multiple times seems to create daemon
child processes under the intellij process (which means that some process is
still not properly destroyed)
* The dispatcher logs of the first run don't reveal the finishing of the job. I
have to check whether the {{Process}} class stops sending the logs to the pipe
before triggering the process destruction (that would explain that the logs are
not complete)
> JobManagerHAProcessFailureRecoveryITCase failed due to job not being found
> --------------------------------------------------------------------------
>
> Key: FLINK-31168
> URL: https://issues.apache.org/jira/browse/FLINK-31168
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.15.3, 1.16.1, 1.18.0
> Reporter: Matthias Pohl
> Assignee: Matthias Pohl
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.18.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=46342&view=logs&j=b0a398c0-685b-599c-eb57-c8c2a771138e&t=747432ad-a576-5911-1e2a-68c6bedc248a&l=12706
> We see this build failure because a job couldn't be found:
> {code}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error while waiting for job to be initialized
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:319)
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:958)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
> at
> org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase.testJobManagerFailure(JobManagerHAProcessFailureRecoveryITCase.java:235)
> at
> org.apache.flink.test.recovery.JobManagerHAProcessFailureRecoveryITCase$4.run(JobManagerHAProcessFailureRecoveryITCase.java:336)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error while waiting for job to be initialized
> at
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
> ... 4 more
> Caused by: java.lang.RuntimeException: Error while waiting for job to be
> initialized
> at
> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:160)
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.NotFoundException: Job
> 865dcd87f4828dbeb3d93eb52e2636b1 not found
> at
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:99)
> at
> java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
> at
> java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.lambda$getExecutionGraphInternal$0(DefaultExecutionGraphCache.java:109)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$1(ClassLoadingUtils.java:93)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
> at akka.dispatch.OnComplete.internal(Future.scala:299)
> at akka.dispatch.OnComplete.internal(Future.scala:297)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at
> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could
> not find Flink job (865dcd87f4828dbeb3d93eb52e2636b1)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.requestExecutionGraphInfo(Dispatcher.java:840)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> ... 5 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)