chengc created ZEPPELIN-4809:
--------------------------------

             Summary: Zeppelin FlinkInterpreterLauncher not work in Zeppelin 
cluster mode
                 Key: ZEPPELIN-4809
                 URL: https://issues.apache.org/jira/browse/ZEPPELIN-4809
             Project: Zeppelin
          Issue Type: Bug
          Components: flink
    Affects Versions: 0.9.0
            Reporter: chengc


When using Zeppelin in cluster mode, I found FLINK_CONF_DIR , FLINK_LIB_DIR and 
FLINK_PLUGINS_DIR are not correct, it leads to some exceptions when submitting 
the Flink SQL job on Yarn. I guess FlinkInterpreterLauncher is not working in 
Zeppelin cluster mode.

I must set FLINK_CONF_DIR/FLINK_LIB_DIR/FLINK_PLUGINS_DIR for the flink 
interpreter, then everything is right.

 

exceptions as follows:

1. not set *FLINK_PLUGINS_DIR*

_org.apache.zeppelin.interpreter.InterpreterException: 
java.lang.RuntimeException: java.io.IOException: Cannot find any jar files for 
plugin in directory [plugins/Launcher]. Please provide the jar files for the 
plugin or delete the directory. at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
 at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
 at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:577)
 at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
 at 
org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: 
java.io.IOException: Cannot find any jar files for plugin in directory 
[plugins/Launcher]. Please provide the jar files for the plugin or delete the 
directory. at 
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199) at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:76)
 at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at 
java.util.Iterator.forEachRemaining(Iterator.java:116) at 
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at 
org.apache.flink.core.plugin.DirectoryBasedPluginFinder.findPlugins(DirectoryBasedPluginFinder.java:77)
 at 
org.apache.flink.core.plugin.PluginUtils.createPluginManagerFromRootFolder(PluginUtils.java:45)
 at 
org.apache.flink.core.plugin.PluginUtils.createPluginManagerFromRootFolder(PluginUtils.java:38)
 at org.apache.flink.client.cli.CliFrontend.<init>(CliFrontend.java:130) at 
org.apache.flink.client.cli.CliFrontend.<init>(CliFrontend.java:119) at 
org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:238)
 at 
org.apache.flink.api.scala.FlinkShell$.createYarnClusterIfNeededAndGetConfig(FlinkShell.scala:219)
 at 
org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:208)
 at 
org.apache.zeppelin.flink.FlinkScalaInterpreter.liftedTree1$1(FlinkScalaInterpreter.scala:193)
 at 
org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:192)
 at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:53) 
at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
 ... 8 more_

 

2. not set *FLINK_CONF_DIR*

_org.apache.zeppelin.interpreter.InterpreterException: 
org.apache.flink.configuration.IllegalConfigurationException: The given 
configuration directory name '' (/home/test/zeppelin-0.9.0-preview1-bin-all) 
does not describe an existing directory. at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
 at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
 at 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:577)
 at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at 
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
 at 
org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) Caused by: 
org.apache.flink.configuration.IllegalConfigurationException: The given 
configuration directory name '' (/home/jm/zeppelin-0.9.0-preview1-bin-all) does 
not describe an existing directory. at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:115)
 at 
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:94)
 at 
org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:120)
 at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:53) 
at 
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
 ... 8 more_

 

3. not set __ *FLINK_LIB_DIR*

*exceptions in flink job:*
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not 
instantiate outputs in order.
    at 
org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:428)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1456)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1440)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:282)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:266)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:246)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:239)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:229)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:63)
    at sun.reflect.GeneratedConstructorAccessor7.newInstance(Unknown Source)
    at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at 
org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1300)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.table.runtime.keyselector.BinaryRowKeySelector
 

*..*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to