[
https://issues.apache.org/jira/browse/FLINK-35212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842295#comment-17842295
]
Wei Yuan commented on FLINK-35212:
----------------------------------
Hi Biao Geng, as you say, maybe the exception which I submitted caused by I
modified same files under python site-packages of pyflink when I try to solve
FLINK-35180. And I met the same exception in commit #0 with Java8 and Python
3.8.6 as you pasted .
Looking forward for your improvement for this.
> PyFlink thread mode process just can run once in standalonesession mode
> -----------------------------------------------------------------------
>
> Key: FLINK-35212
> URL: https://issues.apache.org/jira/browse/FLINK-35212
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Environment: Python 3.10.14
> PyFlink==1.18.1
> openjdk version "11.0.21" 2023-10-17 LTS
> OpenJDK Runtime Environment (Red_Hat-11.0.21.0.9-1.el7_9) (build
> 11.0.21+9-LTS)
> OpenJDK 64-Bit Server VM (Red_Hat-11.0.21.0.9-1.el7_9) (build 11.0.21+9-LTS,
> mixed mode, sharing)
> Reporter: Wei Yuan
> Priority: Critical
>
> {code:java}
> from pyflink.common.types import Row
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.common import Types, WatermarkStrategy, Configuration
> from pyflink.table import EnvironmentSettings, TableEnvironment
> from pyflink.table import StreamTableEnvironment, Schema
> from pyflink.datastream.functions import ProcessFunction, MapFunction
> from pyflink.common.time import Instant
> # init task env
> config = Configuration()
> config.set_string("python.execution-mode", "thread")
> # config.set_string("python.execution-mode", "process")
> config.set_string("python.client.executable", "/root/miniconda3/bin/python3")
> config.set_string("python.executable", "/root/miniconda3/bin/python3")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> table_env = StreamTableEnvironment.create(env)
> # create a batch TableEnvironment
> table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')]).alias("id",
> "content")
> table_env.create_temporary_view("test_table", table)
> result_table = table_env.sql_query("select *, NOW() as dt from test_table")
> result_ds = table_env.to_data_stream(result_table)
> # def test_func(row):
> # return row
> # result_ds.map(test_func).print()
> result_ds.print()
> env.execute()
> {code}
> Start a standalone session mode cluster by command:
> {code:java}
> /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/bin/start-cluster.sh{code}
> Submit thread mode job for the first time, this job will success fnished.
> {code:java}
> /root/miniconda3/lib/python3.10/site-packages/pyflink/bin/flink run -py
> bug.py {code}
> Use above command to submit job for the second time, an error occured:
> {code:java}
> Job has been submitted with JobID a4f2728199277bba0500796f7925fa26
> Traceback (most recent call last):
> File "/home/disk1/bug.py", line 34, in <module>
> env.execute()
> File
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py",
> line 773, in execute
> return
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
> File "/root/miniconda3/lib/python3.10/site-packages/py4j/java_gateway.py",
> line 1322, in __call__
> return_value = get_return_value(
> File
> "/root/miniconda3/lib/python3.10/site-packages/pyflink/util/exceptions.py",
> line 146, in deco
> return f(*a, **kw)
> File "/root/miniconda3/lib/python3.10/site-packages/py4j/protocol.py", line
> 326, in get_return_value
> raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o7.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: a4f2728199277bba0500796f7925fa26)
> at
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> at
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:171)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:122)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
> at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
> failed (JobID: a4f2728199277bba0500796f7925fa26)
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
> at
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$33(RestClusterClient.java:794)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2079)
> at
> org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
> at
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
> ... 23 more
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
> at jdk.internal.reflect.GeneratedMethodAccessor16.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
> at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
> at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> at
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> at
> org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: java.lang.Error: java.lang.UnsatisfiedLinkError: 'void
> pemja.core.PythonInterpreter$MainInterpreter.initialize()'
> at
> pemja.core.PythonInterpreter$MainInterpreter.initialize(PythonInterpreter.java:429)
> at pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:145)
> at pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:46)
> at
> org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator.open(AbstractEmbeddedPythonFunctionOperator.java:72)
> at
> org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator.open(AbstractEmbeddedDataStreamPythonFunctionOperator.java:88)
> at
> org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.open(AbstractOneInputEmbeddedPythonFunctionOperator.java:68)
> at
> org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonProcessOperator.open(EmbeddedPythonProcessOperator.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.UnsatisfiedLinkError: 'void
> pemja.core.PythonInterpreter$MainInterpreter.initialize()'
> at pemja.core.PythonInterpreter$MainInterpreter.initialize(Native
> Method)
> at
> pemja.core.PythonInterpreter$MainInterpreter.access$100(PythonInterpreter.java:332)
> at
> pemja.core.PythonInterpreter$MainInterpreter$1.run(PythonInterpreter.java:400)org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at
> org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
> at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
> ... 14 more {code}
> I guess maybe something wrong in taskmanager process when python and pemja
> shared libraries have already loaded in first time.
>
> I think the thread mode of PyFlink will not be available in the standalone
> session cluster if this issue is not resolved, so I have set the priority to
> critical. Please feel free to modify if have different opinions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)