This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3c84c229d16 [SPARK-45328][SQL] Remove Hive support prior to 2.0.0 3c84c229d16 is described below commit 3c84c229d167a6ab2857649e91fff6f0d57bb12c Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Wed Sep 27 07:20:14 2023 +0900 [SPARK-45328][SQL] Remove Hive support prior to 2.0.0 ### What changes were proposed in this pull request? This PR proposes to remove Hive support prior to 2.0.0 (`spark.sql.hive.metastore.version`). ### Why are the changes needed? We dropped JDK 8 and 11, and Hive prior to 2.0.0 cannot work together. They are actually already the dead code. ### Does this PR introduce _any_ user-facing change? Technically no, because this wouldn't already work. ### How was this patch tested? Nope because there is no way to test them. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43116 from HyukjinKwon/SPARK-45328. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- docs/sql-migration-guide.md | 1 + .../org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 6 --- .../apache/spark/sql/hive/client/HiveShim.scala | 12 +++--- .../sql/hive/client/IsolatedClientLoader.scala | 6 --- .../org/apache/spark/sql/hive/client/package.scala | 46 +--------------------- .../spark/sql/hive/execution/HiveTempPath.scala | 40 ++----------------- .../spark/sql/hive/client/HiveClientVersions.scala | 7 +--- .../hive/client/HivePartitionFilteringSuites.scala | 3 +- 9 files changed, 16 insertions(+), 107 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 56a3c8292cd..a28f6fd284d 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -26,6 +26,7 @@ license: | - Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set `spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`). - Since Spark 4.0, any read of SQL tables takes into consideration the SQL configs `spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles` instead of the core config `spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`. +- Since Spark 4.0, `spark.sql.hive.metastore` drops the support of Hive prior to 2.0.0 as they require JDK 8 that Spark does not support anymore. Users should migrate to higher versions. ## Upgrading from Spark SQL 3.4 to 3.5 diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index a01246520f3..794838a1190 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -73,7 +73,7 @@ private[spark] object HiveUtils extends Logging { val HIVE_METASTORE_VERSION = buildStaticConf("spark.sql.hive.metastore.version") .doc("Version of the Hive metastore. Available options are " + - "<code>0.12.0</code> through <code>2.3.9</code> and " + + "<code>2.0.0</code> through <code>2.3.9</code> and " + "<code>3.0.0</code> through <code>3.1.3</code>.") .version("1.4.0") .stringConf diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index f3d7d7e66a5..4e4ef6ce9f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -115,12 +115,6 @@ private[hive] class HiveClientImpl( private val outputBuffer = new CircularBuffer() private val shim = version match { - case hive.v12 => new Shim_v0_12() - case hive.v13 => new Shim_v0_13() - case hive.v14 => new Shim_v0_14() - case hive.v1_0 => new Shim_v1_0() - case hive.v1_1 => new Shim_v1_1() - case hive.v1_2 => new Shim_v1_2() case hive.v2_0 => new Shim_v2_0() case hive.v2_1 => new Shim_v2_1() case hive.v2_2 => new Shim_v2_2() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 338498d3d48..e12fe857c88 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -255,7 +255,7 @@ private[client] sealed abstract class Shim { } } -private[client] class Shim_v0_12 extends Shim with Logging { +private class Shim_v0_12 extends Shim with Logging { // See HIVE-12224, HOLD_DDLTIME was broken as soon as it landed protected lazy val holdDDLTime = JBoolean.FALSE // deletes the underlying data along with metadata @@ -698,7 +698,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { } } -private[client] class Shim_v0_13 extends Shim_v0_12 { +private class Shim_v0_13 extends Shim_v0_12 { private lazy val setCurrentSessionStateMethod = findStaticMethod( @@ -1222,7 +1222,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } } -private[client] class Shim_v0_14 extends Shim_v0_13 { +private class Shim_v0_14 extends Shim_v0_13 { // true if this is an ACID operation protected lazy val isAcid = JBoolean.FALSE @@ -1341,9 +1341,9 @@ private[client] class Shim_v0_14 extends Shim_v0_13 { } -private[client] class Shim_v1_0 extends Shim_v0_14 +private class Shim_v1_0 extends Shim_v0_14 -private[client] class Shim_v1_1 extends Shim_v1_0 { +private class Shim_v1_1 extends Shim_v1_0 { // throws an exception if the index does not exist protected lazy val throwExceptionInDropIndex = JBoolean.TRUE @@ -1366,7 +1366,7 @@ private[client] class Shim_v1_1 extends Shim_v1_0 { } -private[client] class Shim_v1_2 extends Shim_v1_1 { +private class Shim_v1_2 extends Shim_v1_1 { // txnId can be 0 unless isAcid == true protected lazy val txnIdInLoadDynamicPartitions: JLong = 0L diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 2765e6af521..d6489f04391 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -90,12 +90,6 @@ private[hive] object IsolatedClientLoader extends Logging { def hiveVersion(version: String): HiveVersion = { VersionUtils.majorMinorPatchVersion(version).flatMap { - case (12, _, _) | (0, 12, _) => Some(hive.v12) - case (13, _, _) | (0, 13, _) => Some(hive.v13) - case (14, _, _) | (0, 14, _) => Some(hive.v14) - case (1, 0, _) => Some(hive.v1_0) - case (1, 1, _) => Some(hive.v1_1) - case (1, 2, _) => Some(hive.v1_2) case (2, 0, _) => Some(hive.v2_0) case (2, 1, _) => Some(hive.v2_1) case (2, 2, _) => Some(hive.v2_2) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index 9304074e866..a66842de7d8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -39,48 +39,6 @@ package object client { // scalastyle:off private[hive] object hive { - case object v12 extends HiveVersion("0.12.0") - case object v13 extends HiveVersion("0.13.1") - - // Do not need Calcite because we disabled hive.cbo.enable. - // - // The other excluded dependencies are nowhere to be found, so exclude them explicitly. If - // they're needed by the metastore client, users will have to dig them out of somewhere and use - // configuration to point Spark at the correct jars. - case object v14 extends HiveVersion("0.14.0", - exclusions = Seq("org.apache.calcite:calcite-core", - "org.apache.calcite:calcite-avatica", - "org.pentaho:pentaho-aggdesigner-algorithm")) - - case object v1_0 extends HiveVersion("1.0.1", - exclusions = Seq("eigenbase:eigenbase-properties", - "org.apache.calcite:calcite-core", - "org.apache.calcite:calcite-avatica", - "org.pentaho:pentaho-aggdesigner-algorithm", - "net.hydromatic:linq4j", - "net.hydromatic:quidem")) - - // The curator dependency was added to the exclusions here because it seems to confuse the ivy - // library. org.apache.curator:curator is a pom dependency but ivy tries to find the jar for it, - // and fails. - case object v1_1 extends HiveVersion("1.1.1", - exclusions = Seq("eigenbase:eigenbase-properties", - "org.apache.calcite:calcite-core", - "org.apache.calcite:calcite-avatica", - "org.apache.curator:*", - "org.pentaho:pentaho-aggdesigner-algorithm", - "net.hydromatic:linq4j", - "net.hydromatic:quidem")) - - case object v1_2 extends HiveVersion("1.2.2", - exclusions = Seq("eigenbase:eigenbase-properties", - "org.apache.calcite:calcite-core", - "org.apache.calcite:calcite-avatica", - "org.apache.curator:*", - "org.pentaho:pentaho-aggdesigner-algorithm", - "net.hydromatic:linq4j", - "net.hydromatic:quidem")) - case object v2_0 extends HiveVersion("2.0.1", exclusions = Seq("org.apache.calcite:calcite-core", "org.apache.calcite:calcite-avatica", @@ -131,8 +89,8 @@ package object client { "org.pentaho:pentaho-aggdesigner-algorithm", "org.apache.hive:hive-vector-code-gen")) - val allSupportedHiveVersions = - Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) + val allSupportedHiveVersions: Set[HiveVersion] = + Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) } // scalastyle:on diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala index 9981ae4cc31..6fd8892fa1f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala @@ -33,7 +33,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hive.client.HiveVersion class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: Path) extends Logging { @@ -44,54 +43,23 @@ class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: P private def getExternalTmpPath(path: Path): Path = { import org.apache.spark.sql.hive.client.hive._ - // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under - // a common scratch directory. After the writing is finished, Hive will simply empty the table - // directory and move the staging directory to it. - // After Hive 1.1, Hive will create the staging directory under the table directory, and when + // Hive will creates the staging directory under the table directory, and when // moving staging directory to table directory, Hive will still empty the table directory, but // will exclude the staging directory there. - // We have to follow the Hive behavior here, to avoid troubles. For example, if we create - // staging directory under the table director for Hive prior to 1.1, the staging directory will - // be removed by Hive when Hive is trying to empty the table directory. - val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) - val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = - Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) - - // Ensure all the supported versions are considered here. - assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == - allSupportedHiveVersions) val externalCatalog = session.sharedState.externalCatalog val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") - val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") - if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { - oldVersionExternalTempPath(path, scratchDir) - } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { - newVersionExternalTempPath(path, stagingDir) + if (allSupportedHiveVersions.contains(hiveVersion)) { + externalTempPath(path, stagingDir) } else { throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) } } - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path = { - val extURI: URI = path.toUri - val scratchPath = new Path(scratchDir, executionId) - var dirPath = new Path( - extURI.getScheme, - extURI.getAuthority, - scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) - - val fs = dirPath.getFileSystem(hadoopConf) - dirPath = new Path(fs.makeQualified(dirPath).toString()) - stagingDirForCreating = Some(dirPath) - dirPath - } - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - private def newVersionExternalTempPath(path: Path, stagingDir: String): Path = { + private def externalTempPath(path: Path, stagingDir: String): Path = { val extURI: URI = path.toUri if (extURI.getScheme == "viewfs") { val qualifiedStagingDir = getStagingDir(path, stagingDir) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala index 6648c04a4c5..1dee9e6dcfc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala @@ -19,16 +19,11 @@ package org.apache.spark.sql.hive.client import scala.collection.immutable.IndexedSeq -import org.apache.commons.lang3.{JavaVersion, SystemUtils} - private[client] trait HiveClientVersions { private val testVersions = sys.env.get("SPARK_TEST_HIVE_CLIENT_VERSIONS") protected val versions = if (testVersions.nonEmpty) { testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq - } else if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1") } else { - IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3", "3.0", - "3.1") + IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuites.scala index a43e778b13b..f10e6386542 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuites.scala @@ -23,7 +23,6 @@ import org.scalatest.Suite class HivePartitionFilteringSuites extends Suite with HiveClientVersions { override def nestedSuites: IndexedSeq[Suite] = { - // Hive 0.12 does not provide the partition filtering API we call - versions.filterNot(_ == "0.12").map(new HivePartitionFilteringSuite(_)) + versions.map(new HivePartitionFilteringSuite(_)) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org