[ https://issues.apache.org/jira/browse/FLINK-35212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840036#comment-17840036 ]
Wei Yuan commented on FLINK-35212: ---------------------------------- After debug this test case, I found an "Native Library /root/miniconda3/lib/python3.10/site-packages/pemja_core.cpython-310-x86_64-linux-gnu.so already loaded in another classloader" error throwed from pemja when taskmanager load shared libraries on the second time. Does the component owner can fix it? [~hxbks2ks] Or we should solve it in Flink, it looks like a common problem in other Java language projects. > PyFlink thread mode process just can run once in standalonesession mode > ----------------------------------------------------------------------- > > Key: FLINK-35212 > URL: https://issues.apache.org/jira/browse/FLINK-35212 > Project: Flink > Issue Type: Bug > Components: API / Python > Environment: Python 3.10.14 > PyFlink==1.18.1 > openjdk version "11.0.21" 2023-10-17 LTS > OpenJDK Runtime Environment (Red_Hat-11.0.21.0.9-1.el7_9) (build > 11.0.21+9-LTS) > OpenJDK 64-Bit Server VM (Red_Hat-11.0.21.0.9-1.el7_9) (build 11.0.21+9-LTS, > mixed mode, sharing) > Reporter: Wei Yuan > Priority: Critical > > {code:java} > from pyflink.common.types import Row > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.common import Types, WatermarkStrategy, Configuration > from pyflink.table import EnvironmentSettings, TableEnvironment > from pyflink.table import StreamTableEnvironment, Schema > from pyflink.datastream.functions import ProcessFunction, MapFunction > from pyflink.common.time import Instant > # init task env > config = Configuration() > config.set_string("python.execution-mode", "thread") > # config.set_string("python.execution-mode", "process") > config.set_string("python.client.executable", "/root/miniconda3/bin/python3") > config.set_string("python.executable", "/root/miniconda3/bin/python3") > env = StreamExecutionEnvironment.get_execution_environment(config) > table_env = StreamTableEnvironment.create(env) > # create a batch TableEnvironment > table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", > "content") > table_env.create_temporary_view("test_table", table) > result_table = table_env.sql_query("select *, NOW() as dt from test_table") > result_ds = table_env.to_data_stream(result_table) > # def test_func(row): > # return row > # result_ds.map(test_func).print() > result_ds.print() > env.execute() > {code} > Start a standalone session mode cluster by command: > {code:java} > /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/bin/start-cluster.sh{code} > Submit thread mode job for the first time, this job will success fnished. > {code:java} > /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/flink run -py > bug.py {code} > Use above command to submit job for the second time, an error occured: > {code:java} > Job has been submitted with JobID a4f2728199277bba0500796f7925fa26 > Traceback (most recent call last): > File "/home/disk1/bug.py", line 34, in <module> > env.execute() > File > "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", > line 773, in execute > return > JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) > File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", > line 1322, in __call__ > return_value = get_return_value( > File > "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", > line 146, in deco > return f(*a, **kw) > File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line > 326, in get_return_value > raise Py4JJavaError( > py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute. > : java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > (JobID: a4f2728199277bba0500796f7925fa26) > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005) > at > org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:171) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:122) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed (JobID: a4f2728199277bba0500796f7925fa26) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) > at > org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$33(RestClusterClient.java:794) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079) > at > org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) > at > java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) > at > java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > at > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128) > ... 23 more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269) > at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) > at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) > at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > at > org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > at > org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) > at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) > at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) > at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) > Caused by: java.lang.Error: java.lang.UnsatisfiedLinkError: 'void > pemja.core.PythonInterpreter$MainInterpreter.initialize()' > at > pemja.core.PythonInterpreter$MainInterpreter.initialize(PythonInterpreter.java:429) > at pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:145) > at pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:46) > at > org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator.open(AbstractEmbeddedPythonFunctionOperator.java:72) > at > org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator.open(AbstractEmbeddedDataStreamPythonFunctionOperator.java:88) > at > org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.open(AbstractOneInputEmbeddedPythonFunctionOperator.java:68) > at > org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonProcessOperator.open(EmbeddedPythonProcessOperator.java:67) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.lang.UnsatisfiedLinkError: 'void > pemja.core.PythonInterpreter$MainInterpreter.initialize()' > at pemja.core.PythonInterpreter$MainInterpreter.initialize(Native > Method) > at > pemja.core.PythonInterpreter$MainInterpreter.access$100(PythonInterpreter.java:332) > at > pemja.core.PythonInterpreter$MainInterpreter$1.run(PythonInterpreter.java:400)org.apache.flink.client.program.ProgramAbortException: > java.lang.RuntimeException: Python process exits with code: 1 > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095) > at > org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at > org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157) > Caused by: java.lang.RuntimeException: Python process exits with code: 1 > at > org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130) > ... 14 more {code} > I guess maybe something wrong in taskmanager process when python and pemja > shared libraries have already loaded in first time. > > I think the thread mode of PyFlink will not be available in the standalone > session cluster if this issue is not resolved, so I have set the priority to > critical. Please feel free to modify if have different opinions. -- This message was sent by Atlassian Jira (v8.20.10#820010)