[ 
https://issues.apache.org/jira/browse/FLINK-35212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840036#comment-17840036
 ] 

Wei Yuan commented on FLINK-35212:
----------------------------------

After debug this test case, I found an "Native Library 
/root/miniconda3/lib/python3.10/site-packages/pemja_core.cpython-310-x86_64-linux-gnu.so
 already loaded in another classloader" error throwed from pemja when 
taskmanager load shared libraries on the second time.
 
Does the component owner can fix it? [~hxbks2ks] 
Or we should solve it in Flink, it looks like a common problem in other Java 
language projects.

> PyFlink thread mode process just can run once in standalonesession mode
> -----------------------------------------------------------------------
>
>                 Key: FLINK-35212
>                 URL: https://issues.apache.org/jira/browse/FLINK-35212
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>         Environment: Python 3.10.14
> PyFlink==1.18.1
> openjdk version "11.0.21" 2023-10-17 LTS
> OpenJDK Runtime Environment (Red_Hat-11.0.21.0.9-1.el7_9) (build 
> 11.0.21+9-LTS)
> OpenJDK 64-Bit Server VM (Red_Hat-11.0.21.0.9-1.el7_9) (build 11.0.21+9-LTS, 
> mixed mode, sharing)
>            Reporter: Wei Yuan
>            Priority: Critical
>
> {code:java}
> from pyflink.common.types import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types, WatermarkStrategy, Configuration
> from pyflink.table import EnvironmentSettings, TableEnvironment
> from pyflink.table import StreamTableEnvironment, Schema
> from pyflink.datastream.functions import ProcessFunction, MapFunction
> from pyflink.common.time import Instant
> # init task env
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> # config.set_string("python.execution-mode", "process")
> config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
> config.set_string("python.executable", "/root/miniconda3/bin/python3")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> table_env = StreamTableEnvironment.create(env)
> # create a batch TableEnvironment
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id", 
> "content")
> table_env.create_temporary_view("test_table", table)
> result_table = table_env.sql_query("select *, NOW() as dt from test_table")
> result_ds = table_env.to_data_stream(result_table)
> # def test_func(row):
> #     return row
> # result_ds.map(test_func).print()
> result_ds.print()
> env.execute()
> {code}
> Start a standalone session mode cluster by command: 
> {code:java}
> /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/bin/start-cluster.sh{code}
> Submit thread mode job for the first time, this job will success fnished.
> {code:java}
> /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/flink run -py 
> bug.py {code}
> Use above command to submit job for the second time, an error occured:
> {code:java}
> Job has been submitted with JobID a4f2728199277bba0500796f7925fa26
> Traceback (most recent call last):
>   File "/home/disk1/bug.py", line 34, in <module>
>     env.execute()
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
>  line 773, in execute
>     return 
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py", 
> line 1322, in __call__
>     return_value = get_return_value(
>   File 
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py", 
> line 146, in deco
>     return f(*a, **kw)
>   File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line 
> 326, in get_return_value
>     raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: a4f2728199277bba0500796f7925fa26)
>         at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>         at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
>         at 
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:171)
>         at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:122)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
>         at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>         at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: a4f2728199277bba0500796f7925fa26)
>         at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
>         at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>         at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>         at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
>         at 
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
>         at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>         at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>         at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>         at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$33(RestClusterClient.java:794)
>         at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>         at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>         at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>         at 
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
>         at 
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
>         at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>         at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>         at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>         at 
> java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
>         at 
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
>         at 
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         ... 1 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>         at 
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
>         at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
>         ... 23 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>         at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
>         at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
>         at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
>         at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
>         at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
>         at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
>         at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
>         at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
>         at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown 
> Source)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
>         at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>         at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
>         at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
>         at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>         at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>         at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>         at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>         at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>         at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>         at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>         at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>         at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>         at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>         at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>         at 
> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>         at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>         at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>         at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>         at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
>         at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>         at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>         at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
>         at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
>         at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: java.lang.Error: java.lang.UnsatisfiedLinkError: 'void 
> pemja.core.PythonInterpreter$MainInterpreter.initialize()'
>         at 
> pemja.core.PythonInterpreter$MainInterpreter.initialize(PythonInterpreter.java:429)
>         at pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:145)
>         at pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:46)
>         at 
> org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator.open(AbstractEmbeddedPythonFunctionOperator.java:72)
>         at 
> org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator.open(AbstractEmbeddedDataStreamPythonFunctionOperator.java:88)
>         at 
> org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.open(AbstractOneInputEmbeddedPythonFunctionOperator.java:68)
>         at 
> org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonProcessOperator.open(EmbeddedPythonProcessOperator.java:67)
>         at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.UnsatisfiedLinkError: 'void 
> pemja.core.PythonInterpreter$MainInterpreter.initialize()'
>         at pemja.core.PythonInterpreter$MainInterpreter.initialize(Native 
> Method)
>         at 
> pemja.core.PythonInterpreter$MainInterpreter.access$100(PythonInterpreter.java:332)
>         at 
> pemja.core.PythonInterpreter$MainInterpreter$1.run(PythonInterpreter.java:400)org.apache.flink.client.program.ProgramAbortException:
>  java.lang.RuntimeException: Python process exits with code: 1
>         at 
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>         at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>         at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
>         at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
>         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
>         at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>         at 
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
>         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
>         at 
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>         ... 14 more {code}
> I guess maybe something wrong in taskmanager process when python and pemja 
> shared libraries have already loaded in first time. 
>  
> I think the thread mode of PyFlink will not be available in the standalone 
> session cluster if this issue is not resolved, so I have set the priority to 
> critical. Please feel free to modify if have different opinions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to