This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2097581dfb2 [SPARK-43186][SQL][HIVE] Remove workaround for FileSinkDesc
2097581dfb2 is described below

commit 2097581dfb2baf9b78e978aea2b7342e55923212
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Wed Apr 19 10:25:41 2023 -0700

    [SPARK-43186][SQL][HIVE] Remove workaround for FileSinkDesc
    
    ### What changes were proposed in this pull request?
    
    Remove `org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc`, which is used 
to address serializable issue of `org.apache.hadoop.hive.ql.plan.FileSinkDesc`
    
    ### Why are the changes needed?
    
    [HIVE-6171](https://issues.apache.org/jira/browse/HIVE-6171) changed 
`FileSinkDesc`'s property from `String dirName` to `Path dirName`, but the 
`Path` is not serializable until 
[HADOOP-13519](https://issues.apache.org/jira/browse/HADOOP-13519) (got fixed 
in Hadoop 3.0.0).
    
    Since SPARK-42452 removed support for Hadoop2, we can remove this 
workaround now.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass GA.
    
    Closes #40848 from pan3793/SPARK-43186.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Chao Sun <sunc...@apple.com>
---
 .../scala/org/apache/spark/sql/hive/HiveShim.scala | 53 ----------------------
 .../spark/sql/hive/execution/HiveFileFormat.scala  |  2 +-
 .../hive/execution/InsertIntoHiveDirCommand.scala  |  4 +-
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  4 +-
 4 files changed, 5 insertions(+), 58 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
index 6605d297010..0b9c55f6083 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -20,22 +20,18 @@ package org.apache.spark.sql.hive
 import java.rmi.server.UID
 
 import scala.collection.JavaConverters._
-import scala.language.implicitConversions
 
 import com.google.common.base.Objects
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities
 import org.apache.hadoop.hive.ql.exec.UDF
-import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
 import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, 
AvroSerdeUtils}
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
 import org.apache.hadoop.io.Writable
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.Decimal
 import org.apache.spark.util.Utils
 
@@ -215,53 +211,4 @@ private[hive] object HiveShim {
       }
     }
   }
-
-  /*
-   * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member 
path is not.
-   * Fix it through wrapper.
-   */
-  implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = {
-    val f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
-    f.setCompressCodec(w.compressCodec)
-    f.setCompressType(w.compressType)
-    f.setTableInfo(w.tableInfo)
-    f.setDestTableId(w.destTableId)
-    f
-  }
-
-  /*
-   * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member 
path is not.
-   * Fix it through wrapper.
-   */
-  private[hive] class ShimFileSinkDesc(
-      var dir: String,
-      var tableInfo: TableDesc,
-      var compressed: Boolean)
-    extends Serializable with Logging {
-    var compressCodec: String = _
-    var compressType: String = _
-    var destTableId: Int = _
-
-    def setCompressed(compressed: Boolean): Unit = {
-      this.compressed = compressed
-    }
-
-    def getDirName(): String = dir
-
-    def setDestTableId(destTableId: Int): Unit = {
-      this.destTableId = destTableId
-    }
-
-    def setTableInfo(tableInfo: TableDesc): Unit = {
-      this.tableInfo = tableInfo
-    }
-
-    def setCompressCodec(intermediateCompressorCodec: String): Unit = {
-      compressCodec = intermediateCompressorCodec
-    }
-
-    def setCompressType(intermediateCompressType: String): Unit = {
-      compressType = intermediateCompressType
-    }
-  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 7dc1fbb433c..29734c4de34 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.hive.ql.exec.Utilities
 import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc
 import org.apache.hadoop.hive.serde2.Serializer
 import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
 import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
@@ -37,7 +38,6 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, 
OutputWriterFactory}
 import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableJobConf
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index bd6278473a7..f69eab7f837 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
@@ -101,8 +102,7 @@ case class InsertIntoHiveDirCommand(
     // The temporary path must be a HDFS path, not a local path.
     val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, 
qualifiedPath)
     val tmpPath = hiveTempPath.externalTempPath
-    val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
-      tmpPath.toString, tableDesc, false)
+    val fileSinkConf = new FileSinkDesc(tmpPath, tableDesc, false)
     setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession)
     hiveTempPath.createTmpPath()
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 2c9720e089a..658d00c46e4 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.sql.{Row, SparkSession}
@@ -33,7 +34,6 @@ import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, 
V1WritesUtils}
 import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive.client.HiveClientImpl
 import org.apache.spark.sql.hive.client.hive._
 
@@ -309,7 +309,7 @@ object InsertIntoHiveTable extends V1WritesHiveUtils {
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     val tableLocation = hiveQlTable.getDataLocation
     val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, 
tableLocation)
-    val fileSinkConf = new 
FileSinkDesc(hiveTempPath.externalTempPath.toString, tableDesc, false)
+    val fileSinkConf = new FileSinkDesc(hiveTempPath.externalTempPath, 
tableDesc, false)
     setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession)
     val fileFormat: FileFormat = new HiveFileFormat(fileSinkConf)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to