Omar Alvarez created FLINK-3616:
-----------------------------------

             Summary: Python API version inconsistencies
                 Key: FLINK-3616
                 URL: https://issues.apache.org/jira/browse/FLINK-3616
             Project: Flink
          Issue Type: Bug
          Components: Python API
    Affects Versions: 0.10.0
         Environment: Rocks 6.1 SP1, CentOS release 6.7 
(2.6.32-573.7.1.el6.x86_64), java/oraclejdk/1.8.0_45, Python 2.7.9
            Reporter: Omar Alvarez


I'm trying to run a python script that uses several packages present in my 
python 2.7.9 installation. The script is failing with an import error due to 
discrepacies in the python versions that the flink environment uses. I have two 
python instalations 2.6 and 2.7. 

When I run my Flink python script everything goes fine, it uses python 2.7, 
until the taskmanager starts executing the job. The problem is that inside the 
job the python version used is the 2.6 ignoring the systemwide config that 
points to python 2.7.

Is there anyway to tell flink where to find the desired python version 
executable?

This is the error:

{code:title=PyExample|borderStyle=solid}
Trajectory loading complete!
03/15/2016 21:17:17     Job execution switched to status RUNNING.
03/15/2016 21:17:17     DataSource (ValueSource)(1/1) switched to SCHEDULED
03/15/2016 21:17:17     DataSource (ValueSource)(1/1) switched to DEPLOYING
03/15/2016 21:17:17     DataSource (ValueSource)(1/1) switched to RUNNING
03/15/2016 21:17:17     MapPartition (PythonMap)(1/1) switched to SCHEDULED
03/15/2016 21:17:17     MapPartition (PythonMap)(1/1) switched to DEPLOYING
03/15/2016 21:17:17     DataSource (ValueSource)(1/1) switched to FINISHED
03/15/2016 21:17:17     MapPartition (PythonMap)(1/1) switched to RUNNING
03/15/2016 21:17:19     MapPartition (PythonMap)(1/1) switched to FAILED
java.lang.Exception: The user defined 'open()' method caused an exception: 
External process for task MapPartition (PythonMap) terminated prematurely.
Traceback (most recent call last):
  File 
"/tmp/flink-dist-cache-f108fb65-b75b-4b6a-bffd-4b6c5e3a3229/a2016bdf5ba366bbcbd68a6cc1ac537b/flink/plan.py",
 line 9, in <module>
    import MDAnalysis
ImportError: No module named MDAnalysis
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: External process for task MapPartition 
(PythonMap) terminated prematurely.
Traceback (most recent call last):
  File 
"/tmp/flink-dist-cache-f108fb65-b75b-4b6a-bffd-4b6c5e3a3229/a2016bdf5ba366bbcbd68a6cc1ac537b/flink/plan.py",
 line 9, in <module>
    import MDAnalysis
ImportError: No module named MDAnalysis
        at 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.startPython(PythonStreamer.java:116)
        at 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.setupProcess(PythonStreamer.java:58)
        at 
org.apache.flink.languagebinding.api.java.common.streaming.Streamer.open(Streamer.java:67)
        at 
org.apache.flink.languagebinding.api.java.python.functions.PythonMapPartition.open(PythonMapPartition.java:47)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        ... 3 more

03/15/2016 21:17:19     Job execution switched to status FAILING.
03/15/2016 21:17:19     DataSink (TextSink)(1/1) switched to CANCELED
03/15/2016 21:17:19     Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.runBlocking(Client.java:370)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:348)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
        at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
        at 
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
        at 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.runPlan(PythonPlanBinder.java:117)
        at 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.main(PythonPlanBinder.java:85)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
        at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The user defined 'open()' method caused an 
exception: External process for task MapPartition (PythonMap) terminated 
prematurely.
Traceback (most recent call last):
  File 
"/tmp/flink-dist-cache-f108fb65-b75b-4b6a-bffd-4b6c5e3a3229/a2016bdf5ba366bbcbd68a6cc1ac537b/flink/plan.py",
 line 9, in <module>
    import MDAnalysis
ImportError: No module named MDAnalysis
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:484)
        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: External process for task MapPartition 
(PythonMap) terminated prematurely.
Traceback (most recent call last):
  File 
"/tmp/flink-dist-cache-f108fb65-b75b-4b6a-bffd-4b6c5e3a3229/a2016bdf5ba366bbcbd68a6cc1ac537b/flink/plan.py",
 line 9, in <module>
    import MDAnalysis
ImportError: No module named MDAnalysis
        at 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.startPython(PythonStreamer.java:116)
        at 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer.setupProcess(PythonStreamer.java:58)
        at 
org.apache.flink.languagebinding.api.java.common.streaming.Streamer.open(Streamer.java:67)
        at 
org.apache.flink.languagebinding.api.java.python.functions.PythonMapPartition.open(PythonMapPartition.java:47)
        at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
        ... 3 more

The exception above occurred while trying to run your command.
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to