This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2097581dfb2 [SPARK-43186][SQL][HIVE] Remove workaround for FileSinkDesc 2097581dfb2 is described below commit 2097581dfb2baf9b78e978aea2b7342e55923212 Author: Cheng Pan <cheng...@apache.org> AuthorDate: Wed Apr 19 10:25:41 2023 -0700 [SPARK-43186][SQL][HIVE] Remove workaround for FileSinkDesc ### What changes were proposed in this pull request? Remove `org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc`, which is used to address serializable issue of `org.apache.hadoop.hive.ql.plan.FileSinkDesc` ### Why are the changes needed? [HIVE-6171](https://issues.apache.org/jira/browse/HIVE-6171) changed `FileSinkDesc`'s property from `String dirName` to `Path dirName`, but the `Path` is not serializable until [HADOOP-13519](https://issues.apache.org/jira/browse/HADOOP-13519) (got fixed in Hadoop 3.0.0). Since SPARK-42452 removed support for Hadoop2, we can remove this workaround now. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #40848 from pan3793/SPARK-43186. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../scala/org/apache/spark/sql/hive/HiveShim.scala | 53 ---------------------- .../spark/sql/hive/execution/HiveFileFormat.scala | 2 +- .../hive/execution/InsertIntoHiveDirCommand.scala | 4 +- .../sql/hive/execution/InsertIntoHiveTable.scala | 4 +- 4 files changed, 5 insertions(+), 58 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala index 6605d297010..0b9c55f6083 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala @@ -20,22 +20,18 @@ package org.apache.spark.sql.hive import java.rmi.server.UID import scala.collection.JavaConverters._ -import scala.language.implicitConversions import com.google.common.base.Objects import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.SerializationUtilities import org.apache.hadoop.hive.ql.exec.UDF -import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro import org.apache.hadoop.hive.serde2.ColumnProjectionUtils import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils} import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector import org.apache.hadoop.io.Writable -import org.apache.spark.internal.Logging import org.apache.spark.sql.types.Decimal import org.apache.spark.util.Utils @@ -215,53 +211,4 @@ private[hive] object HiveShim { } } } - - /* - * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not. - * Fix it through wrapper. - */ - implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = { - val f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed) - f.setCompressCodec(w.compressCodec) - f.setCompressType(w.compressType) - f.setTableInfo(w.tableInfo) - f.setDestTableId(w.destTableId) - f - } - - /* - * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not. - * Fix it through wrapper. - */ - private[hive] class ShimFileSinkDesc( - var dir: String, - var tableInfo: TableDesc, - var compressed: Boolean) - extends Serializable with Logging { - var compressCodec: String = _ - var compressType: String = _ - var destTableId: Int = _ - - def setCompressed(compressed: Boolean): Unit = { - this.compressed = compressed - } - - def getDirName(): String = dir - - def setDestTableId(destTableId: Int): Unit = { - this.destTableId = destTableId - } - - def setTableInfo(tableInfo: TableDesc): Unit = { - this.tableInfo = tableInfo - } - - def setCompressCodec(intermediateCompressorCodec: String): Unit = { - compressCodec = intermediateCompressorCodec - } - - def setCompressType(intermediateCompressType: String): Unit = { - compressType = intermediateCompressType - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index 7dc1fbb433c..29734c4de34 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} +import org.apache.hadoop.hive.ql.plan.FileSinkDesc import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -37,7 +38,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil} -import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableJobConf diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index bd6278473a7..f69eab7f837 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.plan.FileSinkDesc import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe @@ -101,8 +102,7 @@ case class InsertIntoHiveDirCommand( // The temporary path must be a HDFS path, not a local path. val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, qualifiedPath) val tmpPath = hiveTempPath.externalTempPath - val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc( - tmpPath.toString, tableDesc, false) + val fileSinkConf = new FileSinkDesc(tmpPath, tableDesc, false) setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession) hiveTempPath.createTmpPath() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 2c9720e089a..658d00c46e4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.plan.FileSinkDesc import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.{Row, SparkSession} @@ -33,7 +34,6 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, V1WritesUtils} import org.apache.spark.sql.hive.HiveExternalCatalog -import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.client.hive._ @@ -309,7 +309,7 @@ object InsertIntoHiveTable extends V1WritesHiveUtils { val hadoopConf = sparkSession.sessionState.newHadoopConf() val tableLocation = hiveQlTable.getDataLocation val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, tableLocation) - val fileSinkConf = new FileSinkDesc(hiveTempPath.externalTempPath.toString, tableDesc, false) + val fileSinkConf = new FileSinkDesc(hiveTempPath.externalTempPath, tableDesc, false) setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession) val fileFormat: FileFormat = new HiveFileFormat(fileSinkConf) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org