This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 78c49e9 [ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode 78c49e9 is described below commit 78c49e923e6c6ac228d259b20d67217db1b4315d Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Fri Feb 15 21:00:25 2019 +0800 [ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode ### What is this PR for? User specified jars is missing in yarn-cluster mode due to we didn't detect the user jar correctly. This PR fix the detecting jar logic in `BaseSparkScalaInterpreter`. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://jira.apache.org/jira/browse/ZEPPELIN-3986 ### How should this be tested? * System integration test is added into SparkIntegrationTest, we tested the case of `spark.jars` & `spark.jars.packages` ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3308 from zjffdu/ZEPPELIN-3986 and squashes the following commits: 49a59b2a1 [Jeff Zhang] [ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode --- .../zeppelin/spark/NewSparkInterpreterTest.java | 28 ---------------- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 38 ++++++++++++++-------- zeppelin-interpreter-integration/pom.xml | 12 +++++++ .../interpreter/integration/DummyClass.java | 21 ++++++++++++ .../zeppelin/integration/SparkIntegrationTest.java | 25 +++++++++++--- 5 files changed, 78 insertions(+), 46 deletions(-) diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java index 54835ae..7c7dad9 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java @@ -370,34 +370,6 @@ public class NewSparkInterpreterTest { interpretThread.join(); } - @Test - public void testDependencies() throws IOException, InterpreterException { - Properties properties = new Properties(); - properties.setProperty("spark.master", "local"); - properties.setProperty("spark.app.name", "test"); - properties.setProperty("zeppelin.spark.maxResult", "100"); - properties.setProperty("zeppelin.spark.useNew", "true"); - // disable color output for easy testing - properties.setProperty("zeppelin.spark.scala.color", "false"); - - // download spark-avro jar - URL website = new URL("http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar"); - ReadableByteChannel rbc = Channels.newChannel(website.openStream()); - File avroJarFile = new File("spark-avro_2.11-3.2.0.jar"); - FileOutputStream fos = new FileOutputStream(avroJarFile); - fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); - - properties.setProperty("spark.jars", avroJarFile.getAbsolutePath()); - - interpreter = new SparkInterpreter(properties); - assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter); - interpreter.setInterpreterGroup(mock(InterpreterGroup.class)); - interpreter.open(); - - InterpreterResult result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext()); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } - //TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the classpath issue is fixed. @Ignore public void testDepInterpreter() throws InterpreterException { diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 183dee6..cd241a8 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -19,6 +19,8 @@ package org.apache.zeppelin.spark import java.io.File +import java.net.URLClassLoader +import java.nio.file.Paths import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.sql.SQLContext @@ -35,6 +37,7 @@ import scala.util.control.NonFatal /** * Base class for different scala versions of SparkInterpreter. It should be * binary compatible between multiple scala versions. + * * @param conf * @param depFiles */ @@ -86,6 +89,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, def interpret(code: String, context: InterpreterContext): InterpreterResult = { val originalOut = System.out + def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = { Console.withOut(interpreterOutput) { System.setOut(Console.out) @@ -236,7 +240,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf) if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive" - || conf.get("spark.useHiveContext", "false").toLowerCase == "true") { + || conf.get("spark.useHiveContext", "false").toLowerCase == "true") { val hiveSiteExisted: Boolean = Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null val hiveClassesPresent = @@ -370,20 +374,26 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } protected def getUserJars(): Seq[String] = { - val sparkJars = conf.getOption("spark.jars").map(_.split(",")) - .map(_.filter(_.nonEmpty)).toSeq.flatten - val depJars = depFiles.asScala.filter(_.endsWith(".jar")) - // add zeppelin spark interpreter jar - val zeppelinInterpreterJarURL = getClass.getProtectionDomain.getCodeSource.getLocation - // zeppelinInterpreterJarURL might be a folder when under unit testing - val result = if (new File(zeppelinInterpreterJarURL.getFile).isDirectory) { - sparkJars ++ depJars - } else { - sparkJars ++ depJars ++ Seq(zeppelinInterpreterJarURL.getFile) + var classLoader = Thread.currentThread().getContextClassLoader + var extraJars = Seq.empty[String] + while (classLoader != null) { + if (classLoader.getClass.getCanonicalName == + "org.apache.spark.util.MutableURLClassLoader") { + extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs() + // Check if the file exists. + .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile } + // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it. + .filterNot { + u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect") + } + .map(url => url.toString).toSeq + classLoader = null + } else { + classLoader = classLoader.getParent + } } - conf.set("spark.jars", result.mkString(",")) - LOGGER.debug("User jar for spark repl: " + conf.get("spark.jars")) - result + LOGGER.debug("User jar for spark repl: " + extraJars.mkString(",")) + extraJars } protected def getUserFiles(): Seq[String] = { diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml index 09c9710..6140925 100644 --- a/zeppelin-interpreter-integration/pom.xml +++ b/zeppelin-interpreter-integration/pom.xml @@ -95,6 +95,18 @@ </exclusions> </dependency> + <dependency> + <groupId>org.apache.maven</groupId> + <artifactId>maven-model</artifactId> + <version>3.0.3</version> + <exclusions> + <exclusion> + <groupId>org.codehaus.plexus</groupId> + <artifactId>plexus-utils</artifactId> + </exclusion> + </exclusions> + </dependency> + <!--test libraries--> <dependency> <groupId>junit</groupId> diff --git a/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java new file mode 100644 index 0000000..1df4618 --- /dev/null +++ b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.integration; + +public class DummyClass { +} diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java index 03a482d..94a6a40 100644 --- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java +++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java @@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.maven.model.Model; +import org.apache.maven.model.io.xpp3.MavenXpp3Reader; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -29,12 +31,15 @@ import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; +import org.codehaus.plexus.util.xml.pull.XmlPullParserException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileReader; import java.io.IOException; import java.util.EnumSet; @@ -80,7 +85,14 @@ public abstract class SparkIntegrationTest { } } - private void testInterpreterBasics() throws IOException, InterpreterException { + private void testInterpreterBasics() throws IOException, InterpreterException, XmlPullParserException { + // add jars & packages for testing + InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); + sparkInterpreterSetting.setProperty("spark.jars.packages", "com.maxmind.geoip2:geoip2:2.5.0"); + MavenXpp3Reader reader = new MavenXpp3Reader(); + Model model = reader.read(new FileReader("pom.xml")); + sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-interpreter-integration-" + model.getVersion() + ".jar").getAbsolutePath()); + // test SparkInterpreter Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark", "test"); @@ -93,6 +105,11 @@ public abstract class SparkIntegrationTest { assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); assertTrue(interpreterResult.message().get(0).getData().contains("45")); + // test jars & packages can be loaded correctly + interpreterResult = sparkInterpreter.interpret("import org.apache.zeppelin.interpreter.integration.DummyClass\n" + + "import com.maxmind.geoip2._", context); + assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code()); + // test PySparkInterpreter Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark", "test"); interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context); @@ -123,7 +140,7 @@ public abstract class SparkIntegrationTest { } @Test - public void testLocalMode() throws IOException, YarnException, InterpreterException { + public void testLocalMode() throws IOException, YarnException, InterpreterException, XmlPullParserException { InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); sparkInterpreterSetting.setProperty("master", "local[*]"); sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome); @@ -143,7 +160,7 @@ public abstract class SparkIntegrationTest { } @Test - public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException { + public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException { InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); sparkInterpreterSetting.setProperty("master", "yarn-client"); sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath()); @@ -166,7 +183,7 @@ public abstract class SparkIntegrationTest { } @Test - public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException { + public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException { InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark"); sparkInterpreterSetting.setProperty("master", "yarn-cluster"); sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());