This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2e34427d4f3 [SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client 2e34427d4f3 is described below commit 2e34427d4f3f537484b9396cb491025aa21ef46b Author: Erik Krogen <xkro...@apache.org> AuthorDate: Wed Mar 1 13:45:31 2023 -0800 [SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client ### What changes were proposed in this pull request? When using the 'builtin' Hive version for the Hive metadata client, do not create a separate classloader, and rather continue to use the overall user/application classloader (regardless of Java version). This standardizes the behavior for all Java versions with that of Java 9+. See SPARK-42539 for more details on why this approach was chosen. Please note that this is a re-submit of #40144. That one introduced test failures, and potentially a real issue, because the PR works by setting `isolationOn = false` for `builtin` mode. In addition to adjusting the classloader, `HiveClientImpl` relies on `isolationOn` to determine if it should use an isolated copy of `SessionState`, so the PR inadvertently switched to using a shared `SessionState` object. I think we do want to continue to have the isolated session state even in `buil [...] ### Why are the changes needed? Please see a much more detailed description in SPARK-42539. The tl;dr is that user-provided JARs (such as `hive-exec-2.3.8.jar`) take precedence over Spark/system JARs when constructing the classloader used by `IsolatedClientLoader` on Java 8 in 'builtin' mode, which can cause unexpected behavior and/or breakages. This violates the expectation that, unless user-first classloader mode is used, Spark JARs should be prioritized over user JARs. It also seems that this separate classloader [...] > attempt to discover the jars that were used to load Spark SQL and use those. This option is only valid when using the execution version of Hive. I can't follow the logic here; the user classloader clearly has all of the necessary Hive JARs, since that's where we're getting the JAR URLs from, so we could just use that directly instead of grabbing the URLs. When this was initially added, it only used the JARs from the user classloader, not any of its parents, which I suspect was the motivating factor (to try to avoid more Spark classes being duplicated inside of the isolated classloader, I guess). But that was changed a month la [...] ### Does this PR introduce _any_ user-facing change? No, except to protect Spark itself from potentially being broken by bad user JARs. ### How was this patch tested? This includes a new unit test in `HiveUtilsSuite` which demonstrates the issue and shows that this approach resolves it. It has also been tested on a live cluster running Java 8 and Hive communication functionality continues to work as expected. Unit tests failing in #40144 have been locally tested (`HiveUtilsSuite`, `HiveSharedStateSuite`, `HiveCliSessionStateSuite`, `JsonHadoopFsRelationSuite`). Closes #40224 from xkrogen/xkrogen/SPARK-42539/hive-isolatedclientloader-builtin-user-jar-conflict-fix/take2. Authored-by: Erik Krogen <xkro...@apache.org> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../main/scala/org/apache/spark/TestUtils.scala | 5 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 37 +-------- .../spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../sql/hive/client/IsolatedClientLoader.scala | 94 ++++++++++++---------- .../org/apache/spark/sql/hive/HiveUtilsSuite.scala | 34 +++++++- 5 files changed, 91 insertions(+), 81 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index bdf81d22efa..13ae6aca38b 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -193,12 +193,15 @@ private[spark] object TestUtils { baseClass: String = null, classpathUrls: Seq[URL] = Seq.empty, implementsClasses: Seq[String] = Seq.empty, - extraCodeBody: String = ""): File = { + extraCodeBody: String = "", + packageName: Option[String] = None): File = { val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("") val implementsText = "implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ") + val packageText = packageName.map(p => s"package $p;\n").getOrElse("") val sourceFile = new JavaSourceFromString(className, s""" + |$packageText |public class $className $extendsText $implementsText { | @Override public String toString() { return "$toStringValue"; } | diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index fe9bdef3d0e..4637a4a0179 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive import java.io.File -import java.net.{URL, URLClassLoader} +import java.net.URL import java.util.Locale import java.util.concurrent.TimeUnit @@ -26,7 +26,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.util.Try -import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars @@ -46,7 +45,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types._ -import org.apache.spark.util.{ChildFirstURLClassLoader, Utils} +import org.apache.spark.util.Utils private[spark] object HiveUtils extends Logging { @@ -409,43 +408,15 @@ private[spark] object HiveUtils extends Logging { s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.") } - // We recursively find all jars in the class loader chain, - // starting from the given classLoader. - def allJars(classLoader: ClassLoader): Array[URL] = classLoader match { - case null => Array.empty[URL] - case childFirst: ChildFirstURLClassLoader => - childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader) - case urlClassLoader: URLClassLoader => - urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent) - case other => allJars(other.getParent) - } - - val classLoader = Utils.getContextOrSparkClassLoader - val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - // Do nothing. The system classloader is no longer a URLClassLoader in Java 9, - // so it won't match the case in allJars. It no longer exposes URLs of - // the system classpath - Array.empty[URL] - } else { - val loadedJars = allJars(classLoader) - // Verify at least one jar was found - if (loadedJars.length == 0) { - throw new IllegalArgumentException( - "Unable to locate hive jars to connect to metastore. " + - s"Please set ${HIVE_METASTORE_JARS.key}.") - } - loadedJars - } - logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, hadoopConf = hadoopConf, - execJars = jars.toSeq, config = configurations, - isolationOn = !isCliSessionState(), + isolationOn = false, + sessionStateIsolationOverride = Some(!isCliSessionState()), barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) } else if (hiveMetastoreJars == "maven") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 0a83ec2689c..f76cc7f3a41 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -134,7 +134,7 @@ private[hive] class HiveClientImpl( // Create an internal session state for this HiveClientImpl. val state: SessionState = { val original = Thread.currentThread().getContextClassLoader - if (clientLoader.isolationOn) { + if (clientLoader.sessionStateIsolationOn) { // Switch to the initClassLoader. Thread.currentThread().setContextClassLoader(initClassLoader) try { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index e65e6d42937..2756a2b18a9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -180,6 +180,9 @@ private[hive] object IsolatedClientLoader extends Logging { * @param config A set of options that will be added to the HiveConf of the constructed client. * @param isolationOn When true, custom versions of barrier classes will be constructed. Must be * true unless loading the version of hive that is on Spark's classloader. + * @param sessionStateIsolationOverride If present, this parameter will specify the value of + * `sessionStateIsolationOn`. If empty (the default), the + * value of `isolationOn` will be used. * @param baseClassLoader The spark classloader that is used to load shared classes. */ private[hive] class IsolatedClientLoader( @@ -189,11 +192,19 @@ private[hive] class IsolatedClientLoader( val execJars: Seq[URL] = Seq.empty, val config: Map[String, String] = Map.empty, val isolationOn: Boolean = true, + sessionStateIsolationOverride: Option[Boolean] = None, val baseClassLoader: ClassLoader = Thread.currentThread().getContextClassLoader, val sharedPrefixes: Seq[String] = Seq.empty, val barrierPrefixes: Seq[String] = Seq.empty) extends Logging { + /** + * This controls whether the generated clients maintain an independent/isolated copy of the + * Hive `SessionState`. If false, the Hive will leverage the global/static copy of + * `SessionState`; if true, it will generate a new copy of the state internally. + */ + val sessionStateIsolationOn: Boolean = sessionStateIsolationOverride.getOrElse(isolationOn) + /** All jars used by the hive specific classloader. */ protected def allJars = execJars.toArray @@ -232,51 +243,46 @@ private[hive] class IsolatedClientLoader( private[hive] val classLoader: MutableURLClassLoader = { val isolatedClassLoader = if (isolationOn) { - if (allJars.isEmpty) { - // See HiveUtils; this is the Java 9+ + builtin mode scenario - baseClassLoader - } else { - val rootClassLoader: ClassLoader = - if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - // In Java 9, the boot classloader can see few JDK classes. The intended parent - // classloader for delegation is now the platform classloader. - // See http://java9.wtf/class-loading/ - val platformCL = - classOf[ClassLoader].getMethod("getPlatformClassLoader"). - invoke(null).asInstanceOf[ClassLoader] - // Check to make sure that the root classloader does not know about Hive. - assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) - platformCL + val rootClassLoader: ClassLoader = + if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { + // In Java 9, the boot classloader can see few JDK classes. The intended parent + // classloader for delegation is now the platform classloader. + // See http://java9.wtf/class-loading/ + val platformCL = + classOf[ClassLoader].getMethod("getPlatformClassLoader"). + invoke(null).asInstanceOf[ClassLoader] + // Check to make sure that the root classloader does not know about Hive. + assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure) + platformCL + } else { + // The boot classloader is represented by null (the instance itself isn't accessible) + // and before Java 9 can see all JDK classes + null + } + new URLClassLoader(allJars, rootClassLoader) { + override def loadClass(name: String, resolve: Boolean): Class[_] = { + val loaded = findLoadedClass(name) + if (loaded == null) doLoadClass(name, resolve) else loaded + } + def doLoadClass(name: String, resolve: Boolean): Class[_] = { + val classFileName = name.replaceAll("\\.", "/") + ".class" + if (isBarrierClass(name)) { + // For barrier classes, we construct a new copy of the class. + val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) + logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") + defineClass(name, bytes, 0, bytes.length) + } else if (!isSharedClass(name)) { + logDebug(s"hive class: $name - ${getResource(classToPath(name))}") + super.loadClass(name, resolve) } else { - // The boot classloader is represented by null (the instance itself isn't accessible) - // and before Java 9 can see all JDK classes - null - } - new URLClassLoader(allJars, rootClassLoader) { - override def loadClass(name: String, resolve: Boolean): Class[_] = { - val loaded = findLoadedClass(name) - if (loaded == null) doLoadClass(name, resolve) else loaded - } - def doLoadClass(name: String, resolve: Boolean): Class[_] = { - val classFileName = name.replaceAll("\\.", "/") + ".class" - if (isBarrierClass(name)) { - // For barrier classes, we construct a new copy of the class. - val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) - logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") - defineClass(name, bytes, 0, bytes.length) - } else if (!isSharedClass(name)) { - logDebug(s"hive class: $name - ${getResource(classToPath(name))}") - super.loadClass(name, resolve) - } else { - // For shared classes, we delegate to baseClassLoader, but fall back in case the - // class is not found. - logDebug(s"shared class: $name") - try { - baseClassLoader.loadClass(name) - } catch { - case _: ClassNotFoundException => - super.loadClass(name, resolve) - } + // For shared classes, we delegate to baseClassLoader, but fall back in case the + // class is not found. + logDebug(s"shared class: $name") + try { + baseClassLoader.loadClass(name) + } catch { + case _: ClassNotFoundException => + super.loadClass(name, resolve) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala index d8e1e012928..823ac8ed957 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.hive +import java.io.File +import java.net.URI + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TestUtils} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.catalog.CatalogDatabase import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.util.ChildFirstURLClassLoader +import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader} class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -77,6 +81,32 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton } } + test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") { + withTempDir { tmpDir => + val classFile = TestUtils.createCompiledClass( + "Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata")) + + val jarFile = new File(tmpDir, "hive-fake.jar") + TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata")) + + val conf = new SparkConf + val contextClassLoader = Thread.currentThread().getContextClassLoader + val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader) + try { + Thread.currentThread().setContextClassLoader(loader) + val client = HiveUtils.newClientForMetadata( + conf, + SparkHadoopUtil.newConfiguration(conf), + HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true)) + client.createDatabase( + CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()), + ignoreIfExists = true) + } finally { + Thread.currentThread().setContextClassLoader(contextClassLoader) + } + } + } + test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") { // Test default value val defaultConf = new Configuration --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org