Omar Alvarez created FLINK-3616: ----------------------------------- Summary: Python API version inconsistencies Key: FLINK-3616 URL: https://issues.apache.org/jira/browse/FLINK-3616 Project: Flink Issue Type: Bug Components: Python API Affects Versions: 0.10.0 Environment: Rocks 6.1 SP1, CentOS release 6.7 (2.6.32-573.7.1.el6.x86_64), java/oraclejdk/1.8.0_45, Python 2.7.9 Reporter: Omar Alvarez
I'm trying to run a python script that uses several packages present in my python 2.7.9 installation. The script is failing with an import error due to discrepacies in the python versions that the flink environment uses. I have two python instalations 2.6 and 2.7. When I run my Flink python script everything goes fine, it uses python 2.7, until the taskmanager starts executing the job. The problem is that inside the job the python version used is the 2.6 ignoring the systemwide config that points to python 2.7. Is there anyway to tell flink where to find the desired python version executable? This is the error: {code:title=PyExample|borderStyle=solid} Trajectory loading complete! 03/15/2016 21:17:17 Job execution switched to status RUNNING. 03/15/2016 21:17:17 DataSource (ValueSource)(1/1) switched to SCHEDULED 03/15/2016 21:17:17 DataSource (ValueSource)(1/1) switched to DEPLOYING 03/15/2016 21:17:17 DataSource (ValueSource)(1/1) switched to RUNNING 03/15/2016 21:17:17 MapPartition (PythonMap)(1/1) switched to SCHEDULED 03/15/2016 21:17:17 MapPartition (PythonMap)(1/1) switched to DEPLOYING 03/15/2016 21:17:17 DataSource (ValueSource)(1/1) switched to FINISHED 03/15/2016 21:17:17 MapPartition (PythonMap)(1/1) switched to RUNNING 03/15/2016 21:17:19 MapPartition (PythonMap)(1/1) switched to FAILED java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely. Traceback (most recent call last): File "/tmp/flink-dist-cache-f108fb65-b75b-4b6a-bffd-4b6c5e3a3229/a2016bdf5ba366bbcbd68a6cc1ac537b/flink/plan.py", line 9, in <module> import MDAnalysis ImportError: No module named MDAnalysis at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely. Traceback (most recent call last): File "/tmp/flink-dist-cache-f108fb65-b75b-4b6a-bffd-4b6c5e3a3229/a2016bdf5ba366bbcbd68a6cc1ac537b/flink/plan.py", line 9, in <module> import MDAnalysis ImportError: No module named MDAnalysis at org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.startPython(PythonStreamer.java:116) at org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.setupProcess(PythonStreamer.java:58) at org.apache.flink.languagebinding.api.java.common.streaming.Streamer.open(Streamer.java:67) at org.apache.flink.languagebinding.api.java.python.functions.PythonMapPartition.open(PythonMapPartition.java:47) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) ... 3 more 03/15/2016 21:17:19 Job execution switched to status FAILING. 03/15/2016 21:17:19 DataSink (TextSink)(1/1) switched to CANCELED 03/15/2016 21:17:19 Job execution switched to status FAILED. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:370) at org.apache.flink.client.program.Client.runBlocking(Client.java:348) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804) at org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.runPlan(PythonPlanBinder.java:117) at org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.main(PythonPlanBinder.java:85) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395) at org.apache.flink.client.program.Client.runBlocking(Client.java:252) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The user defined 'open()' method caused an exception: External process for task MapPartition (PythonMap) terminated prematurely. Traceback (most recent call last): File "/tmp/flink-dist-cache-f108fb65-b75b-4b6a-bffd-4b6c5e3a3229/a2016bdf5ba366bbcbd68a6cc1ac537b/flink/plan.py", line 9, in <module> import MDAnalysis ImportError: No module named MDAnalysis at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMap) terminated prematurely. Traceback (most recent call last): File "/tmp/flink-dist-cache-f108fb65-b75b-4b6a-bffd-4b6c5e3a3229/a2016bdf5ba366bbcbd68a6cc1ac537b/flink/plan.py", line 9, in <module> import MDAnalysis ImportError: No module named MDAnalysis at org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.startPython(PythonStreamer.java:116) at org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.setupProcess(PythonStreamer.java:58) at org.apache.flink.languagebinding.api.java.common.streaming.Streamer.open(Streamer.java:67) at org.apache.flink.languagebinding.api.java.python.functions.PythonMapPartition.open(PythonMapPartition.java:47) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) ... 3 more The exception above occurred while trying to run your command. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)