chengc created ZEPPELIN-4809: -------------------------------- Summary: Zeppelin FlinkInterpreterLauncher not work in Zeppelin cluster mode Key: ZEPPELIN-4809 URL: https://issues.apache.org/jira/browse/ZEPPELIN-4809 Project: Zeppelin Issue Type: Bug Components: flink Affects Versions: 0.9.0 Reporter: chengc
When using Zeppelin in cluster mode, I found FLINK_CONF_DIR , FLINK_LIB_DIR and FLINK_PLUGINS_DIR are not correct, it leads to some exceptions when submitting the Flink SQL job on Yarn. I guess FlinkInterpreterLauncher is not working in Zeppelin cluster mode. I must set FLINK_CONF_DIR/FLINK_LIB_DIR/FLINK_PLUGINS_DIR for the flink interpreter, then everything is right. exceptions as follows: 1. not set *FLINK_PLUGINS_DIR* _org.apache.zeppelin.interpreter.InterpreterException: java.lang.RuntimeException: java.io.IOException: Cannot find any jar files for plugin in directory [plugins/Launcher]. Please provide the jar files for the plugin or delete the directory. at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:577) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) at org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: java.io.IOException: Cannot find any jar files for plugin in directory [plugins/Launcher]. Please provide the jar files for the plugin or delete the directory. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:76) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.Iterator.forEachRemaining(Iterator.java:116) at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) at org.apache.flink.core.plugin.DirectoryBasedPluginFinder.findPlugins(DirectoryBasedPluginFinder.java:77) at org.apache.flink.core.plugin.PluginUtils.createPluginManagerFromRootFolder(PluginUtils.java:45) at org.apache.flink.core.plugin.PluginUtils.createPluginManagerFromRootFolder(PluginUtils.java:38) at org.apache.flink.client.cli.CliFrontend.<init>(CliFrontend.java:130) at org.apache.flink.client.cli.CliFrontend.<init>(CliFrontend.java:119) at org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:238) at org.apache.flink.api.scala.FlinkShell$.createYarnClusterIfNeededAndGetConfig(FlinkShell.scala:219) at org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:208) at org.apache.zeppelin.flink.FlinkScalaInterpreter.liftedTree1$1(FlinkScalaInterpreter.scala:193) at org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:192) at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:53) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70) ... 8 more_ 2. not set *FLINK_CONF_DIR* _org.apache.zeppelin.interpreter.InterpreterException: org.apache.flink.configuration.IllegalConfigurationException: The given configuration directory name '' (/home/test/zeppelin-0.9.0-preview1-bin-all) does not describe an existing directory. at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:76) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:577) at org.apache.zeppelin.scheduler.Job.run(Job.java:172) at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130) at org.apache.zeppelin.scheduler.FIFOScheduler.lambda$runJobInScheduler$0(FIFOScheduler.java:39) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.configuration.IllegalConfigurationException: The given configuration directory name '' (/home/jm/zeppelin-0.9.0-preview1-bin-all) does not describe an existing directory. at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:115) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:94) at org.apache.zeppelin.flink.FlinkScalaInterpreter.open(FlinkScalaInterpreter.scala:120) at org.apache.zeppelin.flink.FlinkInterpreter.open(FlinkInterpreter.java:53) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:70) ... 8 more_ 3. not set __ *FLINK_LIB_DIR* *exceptions in flink job:* org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order. at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:428) at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1456) at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1440) at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:282) at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:266) at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:246) at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:239) at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:229) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.<init>(SourceStreamTask.java:63) at sun.reflect.GeneratedConstructorAccessor7.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1300) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:685) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.runtime.keyselector.BinaryRowKeySelector *..* -- This message was sent by Atlassian Jira (v8.3.4#803005)