chengc created ZEPPELIN-4809:
--------------------------------
Summary: Zeppelin FlinkInterpreterLauncher not work in Zeppelin
cluster mode
Key: ZEPPELIN-4809
URL: https://issues.apache.org/jira/browse/ZEPPELIN-4809
Project: Zeppelin
Issue Type: Bug
Components: flink
Affects Versions: 0.9.0
Reporter: chengc
When using Zeppelin in cluster mode, I found FLINK_CONF_DIR , FLINK_LIB_DIR and
FLINK_PLUGINS_DIR are not correct, it leads to some exceptions when submitting
the Flink SQL job on Yarn. I guess FlinkInterpreterLauncher is not working in
Zeppelin cluster mode.
I must set FLINK_CONF_DIR/FLINK_LIB_DIR/FLINK_PLUGINS_DIR for the flink
interpreter, then everything is right.
exceptions as follows:
1. not set *FLINK_PLUGINS_DIR*
_org.apache.zeppelin.interpreter.InterpreterException:
java.lang.RuntimeException: java.io.IOException: Cannot find any jar files for
plugin in directory [plugins/Launcher]. Please provide the jar files for the
plugin or delete the directory. at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:577)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
at
org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException:
java.io.IOException: Cannot find any jar files for plugin in directory
[plugins/Launcher]. Please provide the jar files for the plugin or delete the
directory. at
org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199) at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:76)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at
java.util.Iterator.forEachRemaining(Iterator.java:116) at
java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at
org.apache.flink.core.plugin.DirectoryBasedPluginFinder.findPlugins(DirectoryBasedPluginFinder.java:77)
at
org.apache.flink.core.plugin.PluginUtils.createPluginManagerFromRootFolder(PluginUtils.java:45)
at
org.apache.flink.core.plugin.PluginUtils.createPluginManagerFromRootFolder(PluginUtils.java:38)
at org.apache.flink.client.cli.CliFrontend.<init>(CliFrontend.java:130) at
org.apache.flink.client.cli.CliFrontend.<init>(CliFrontend.java:119) at
org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:238)
at
org.apache.flink.api.scala.FlinkShell$.createYarnClusterIfNeededAndGetConfig(FlinkShell.scala:219)
at
org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:208)
at
org.apache.zeppelin.flink.FlinkScalaInterpreter.liftedTree1$1(FlinkScalaInterpreter.scala:193)
at
org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:192)
at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:53)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
... 8 more_
2. not set *FLINK_CONF_DIR*
_org.apache.zeppelin.interpreter.InterpreterException:
org.apache.flink.configuration.IllegalConfigurationException: The given
configuration directory name '' (/home/test/zeppelin-0.9.0-preview1-bin-all)
does not describe an existing directory. at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:577)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
at
org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Caused by:
org.apache.flink.configuration.IllegalConfigurationException: The given
configuration directory name '' (/home/jm/zeppelin-0.9.0-preview1-bin-all) does
not describe an existing directory. at
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:115)
at
org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:94)
at
org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:120)
at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:53)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70)
... 8 more_
3. not set __ *FLINK_LIB_DIR*
*exceptions in flink job:*
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
instantiate outputs in order.
at
org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:428)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1456)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1440)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:282)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:266)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:246)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:239)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:229)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:63)
at sun.reflect.GeneratedConstructorAccessor7.newInstance(Unknown Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1300)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.table.runtime.keyselector.BinaryRowKeySelector
*..*
--
This message was sent by Atlassian Jira
(v8.3.4#803005)