This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new fd6b0bc  [SPARK-25102][SQL][2.4] Write Spark version to ORC/Parquet 
file metadata
fd6b0bc is described below

commit fd6b0bccfc3b99b88f315f69ab78ce79d0f83c17
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Tue Apr 7 13:09:30 2020 -0700

    [SPARK-25102][SQL][2.4] Write Spark version to ORC/Parquet file metadata
    
    ### What changes were proposed in this pull request?
    
    This is a backport of https://github.com/apache/spark/pull/22932 .
    
    Currently, Spark writes Spark version number into Hive Table properties 
with `spark.sql.create.version`.
    ```
    parameters:{
      spark.sql.sources.schema.part.0={
        "type":"struct",
        "fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}}]
      },
      transient_lastDdlTime=1541142761,
      spark.sql.sources.schema.numParts=1,
      spark.sql.create.version=2.4.0
    }
    ```
    
    This PR aims to write Spark versions to ORC/Parquet file metadata with 
`org.apache.spark.sql.create.version` because we used `org.apache.` prefix in 
Parquet metadata already. It's different from Hive Table property key 
`spark.sql.create.version`, but it seems that we cannot change Hive Table 
property for backward compatibility.
    
    After this PR, ORC and Parquet file generated by Spark will have the 
following metadata.
    
    **ORC (`native` and `hive` implmentation)**
    ```
    $ orc-tools meta /tmp/o
    File Version: 0.12 with ...
    ...
    User Metadata:
      org.apache.spark.sql.create.version=3.0.0
    ```
    
    **PARQUET**
    ```
    $ parquet-tools meta /tmp/p
    ...
    creator:     parquet-mr version 1.10.0 (build 
031a6654009e3b82020012a18434c582bd74c73a)
    extra:       org.apache.spark.sql.create.version = 3.0.0
    extra:       org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}}]}
    ```
    
    ### Why are the changes needed?
    
    This backport helps us handle this files differently in Apache Spark 3.0.0.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the Jenkins with newly added test cases.
    
    Closes #28142 from dongjoon-hyun/SPARK-25102-2.4.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 core/src/main/scala/org/apache/spark/package.scala |  3 +++
 .../scala/org/apache/spark/util/VersionUtils.scala | 14 ++++++++++++
 .../org/apache/spark/util/VersionUtilsSuite.scala  | 25 ++++++++++++++++++++++
 .../datasources/orc/OrcOutputWriter.scala          | 15 +++++++++----
 .../sql/execution/datasources/orc/OrcUtils.scala   | 14 +++++++++---
 .../datasources/parquet/ParquetWriteSupport.scala  |  7 +++++-
 .../main/scala/org/apache/spark/sql/package.scala  |  9 ++++++++
 .../results/describe-part-after-analyze.sql.out    | 12 +++++------
 .../columnar/InMemoryColumnarQuerySuite.scala      |  4 ++--
 .../datasources/HadoopFsRelationSuite.scala        |  2 +-
 .../execution/datasources/orc/OrcSourceSuite.scala | 20 ++++++++++++++++-
 .../datasources/parquet/ParquetIOSuite.scala       | 21 +++++++++++++++++-
 .../apache/spark/sql/hive/orc/OrcFileFormat.scala  | 24 +++++++++++++++++++--
 .../apache/spark/sql/hive/StatisticsSuite.scala    |  8 +++----
 14 files changed, 153 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/package.scala 
b/core/src/main/scala/org/apache/spark/package.scala
index 8058a4d..5d0639e 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -19,6 +19,8 @@ package org.apache
 
 import java.util.Properties
 
+import org.apache.spark.util.VersionUtils
+
 /**
  * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the 
main entry point to
  * Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a 
distributed collection,
@@ -89,6 +91,7 @@ package object spark {
   }
 
   val SPARK_VERSION = SparkBuildInfo.spark_version
+  val SPARK_VERSION_SHORT = 
VersionUtils.shortVersion(SparkBuildInfo.spark_version)
   val SPARK_BRANCH = SparkBuildInfo.spark_branch
   val SPARK_REVISION = SparkBuildInfo.spark_revision
   val SPARK_BUILD_USER = SparkBuildInfo.spark_build_user
diff --git a/core/src/main/scala/org/apache/spark/util/VersionUtils.scala 
b/core/src/main/scala/org/apache/spark/util/VersionUtils.scala
index 828153b8..c0f8866 100644
--- a/core/src/main/scala/org/apache/spark/util/VersionUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/VersionUtils.scala
@@ -23,6 +23,7 @@ package org.apache.spark.util
 private[spark] object VersionUtils {
 
   private val majorMinorRegex = """^(\d+)\.(\d+)(\..*)?$""".r
+  private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r
 
   /**
    * Given a Spark version string, return the major version number.
@@ -37,6 +38,19 @@ private[spark] object VersionUtils {
   def minorVersion(sparkVersion: String): Int = 
majorMinorVersion(sparkVersion)._2
 
   /**
+   * Given a Spark version string, return the short version string.
+   * E.g., for 3.0.0-SNAPSHOT, return '3.0.0'.
+   */
+  def shortVersion(sparkVersion: String): String = {
+    shortVersionRegex.findFirstMatchIn(sparkVersion) match {
+      case Some(m) => m.group(1)
+      case None =>
+        throw new IllegalArgumentException(s"Spark tried to parse 
'$sparkVersion' as a Spark" +
+          s" version string, but it could not find the major/minor/maintenance 
version numbers.")
+    }
+  }
+
+  /**
    * Given a Spark version string, return the (major version number, minor 
version number).
    * E.g., for 2.0.1-SNAPSHOT, return (2, 0).
    */
diff --git a/core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala
index b36d6be..56623eb 100644
--- a/core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/VersionUtilsSuite.scala
@@ -73,4 +73,29 @@ class VersionUtilsSuite extends SparkFunSuite {
       }
     }
   }
+
+  test("Return short version number") {
+    assert(shortVersion("3.0.0") === "3.0.0")
+    assert(shortVersion("3.0.0-SNAPSHOT") === "3.0.0")
+    withClue("shortVersion parsing should fail for missing maintenance version 
number") {
+      intercept[IllegalArgumentException] {
+        shortVersion("3.0")
+      }
+    }
+    withClue("shortVersion parsing should fail for invalid major version 
number") {
+      intercept[IllegalArgumentException] {
+        shortVersion("x.0.0")
+      }
+    }
+    withClue("shortVersion parsing should fail for invalid minor version 
number") {
+      intercept[IllegalArgumentException] {
+        shortVersion("3.x.0")
+      }
+    }
+    withClue("shortVersion parsing should fail for invalid maintenance version 
number") {
+      intercept[IllegalArgumentException] {
+        shortVersion("3.0.x")
+      }
+    }
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala
index 84755bf..7e38fc6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.datasources.orc
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.orc.mapred.OrcStruct
-import org.apache.orc.mapreduce.OrcOutputFormat
+import org.apache.orc.OrcFile
+import org.apache.orc.mapred.{OrcOutputFormat => OrcMapRedOutputFormat, 
OrcStruct}
+import org.apache.orc.mapreduce.{OrcMapreduceRecordWriter, OrcOutputFormat}
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.OutputWriter
@@ -36,11 +37,17 @@ private[orc] class OrcOutputWriter(
   private[this] val serializer = new OrcSerializer(dataSchema)
 
   private val recordWriter = {
-    new OrcOutputFormat[OrcStruct]() {
+    val orcOutputFormat = new OrcOutputFormat[OrcStruct]() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: 
String): Path = {
         new Path(path)
       }
-    }.getRecordWriter(context)
+    }
+    val filename = orcOutputFormat.getDefaultWorkFile(context, ".orc")
+    val options = OrcMapRedOutputFormat.buildOptions(context.getConfiguration)
+    val writer = OrcFile.createWriter(filename, options)
+    val recordWriter = new OrcMapreduceRecordWriter[OrcStruct](writer)
+    OrcUtils.addSparkVersionMetadata(writer)
+    recordWriter
   }
 
   override def write(row: InternalRow): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index 95fb25b..57d2c56 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -17,18 +17,19 @@
 
 package org.apache.spark.sql.execution.datasources.orc
 
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Locale
 
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.orc.{OrcFile, Reader, TypeDescription}
+import org.apache.orc.{OrcFile, Reader, TypeDescription, Writer}
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.types._
@@ -144,4 +145,11 @@ object OrcUtils extends Logging {
       }
     }
   }
+
+  /**
+   * Add a metadata specifying Spark version.
+   */
+  def addSparkVersionMetadata(writer: Writer): Unit = {
+    writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, 
UTF_8.encode(SPARK_VERSION_SHORT))
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index b40b8c2..8814e3c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -29,7 +29,9 @@ import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
 import org.apache.parquet.io.api.{Binary, RecordConsumer}
 
+import org.apache.spark.SPARK_VERSION_SHORT
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -93,7 +95,10 @@ private[parquet] class ParquetWriteSupport extends 
WriteSupport[InternalRow] wit
     this.rootFieldWriters = 
schema.map(_.dataType).map(makeWriter).toArray[ValueWriter]
 
     val messageType = new 
SparkToParquetSchemaConverter(configuration).convert(schema)
-    val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> 
schemaString).asJava
+    val metadata = Map(
+      SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT,
+      ParquetReadSupport.SPARK_METADATA_KEY -> schemaString
+    ).asJava
 
     logInfo(
       s"""Initialized Parquet WriteSupport with Catalyst schema:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 161e010..354660e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -44,4 +44,13 @@ package object sql {
   type Strategy = SparkStrategy
 
   type DataFrame = Dataset[Row]
+
+  /**
+   * Metadata key which is used to write Spark version in the followings:
+   * - Parquet file metadata
+   * - ORC file metadata
+   *
+   * Note that Hive table property `spark.sql.create.version` also has Spark 
version.
+   */
+  private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
 }
diff --git 
a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
index 8ba69c6..7d87018 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
@@ -93,7 +93,7 @@ Partition Values      [ds=2017-08-01, hr=10]
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10                       
 
 Created Time [not included in comparison]
 Last Access [not included in comparison]
-Partition Statistics   1121 bytes, 3 rows                          
+Partition Statistics   1189 bytes, 3 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -128,7 +128,7 @@ Partition Values            [ds=2017-08-01, hr=10]
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10                       
 
 Created Time [not included in comparison]
 Last Access [not included in comparison]
-Partition Statistics   1121 bytes, 3 rows                          
+Partition Statistics   1189 bytes, 3 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -155,7 +155,7 @@ Partition Values            [ds=2017-08-01, hr=11]
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11                       
 
 Created Time [not included in comparison]
 Last Access [not included in comparison]
-Partition Statistics   1098 bytes, 4 rows                          
+Partition Statistics   1166 bytes, 4 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -190,7 +190,7 @@ Partition Values            [ds=2017-08-01, hr=10]
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10                       
 
 Created Time [not included in comparison]
 Last Access [not included in comparison]
-Partition Statistics   1121 bytes, 3 rows                          
+Partition Statistics   1189 bytes, 3 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -217,7 +217,7 @@ Partition Values            [ds=2017-08-01, hr=11]
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11                       
 
 Created Time [not included in comparison]
 Last Access [not included in comparison]
-Partition Statistics   1098 bytes, 4 rows                          
+Partition Statistics   1166 bytes, 4 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
@@ -244,7 +244,7 @@ Partition Values            [ds=2017-09-01, hr=5]
 Location [not included in 
comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5                        
 
 Created Time [not included in comparison]
 Last Access [not included in comparison]
-Partition Statistics   1144 bytes, 2 rows                          
+Partition Statistics   1212 bytes, 2 rows                          
                                                                    
 # Storage Information                                              
 Location [not included in comparison]sql/core/spark-warehouse/t
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index b1b23e4..8e4d13d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -509,7 +509,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
             case plan: InMemoryRelation => plan
           }.head
           // InMemoryRelation's stats is file size before the underlying RDD 
is materialized
-          assert(inMemoryRelation.computeStats().sizeInBytes === 800)
+          assert(inMemoryRelation.computeStats().sizeInBytes === 868)
 
           // InMemoryRelation's stats is updated after materializing RDD
           dfFromFile.collect()
@@ -522,7 +522,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 
           // Even CBO enabled, InMemoryRelation's stats keeps as the file size 
before table's stats
           // is calculated
-          assert(inMemoryRelation2.computeStats().sizeInBytes === 800)
+          assert(inMemoryRelation2.computeStats().sizeInBytes === 868)
 
           // InMemoryRelation's stats should be updated after calculating 
stats of the table
           // clear cache to simulate a fresh environment
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
index c1f2c18..6e08ee3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
@@ -45,7 +45,7 @@ class HadoopFsRelationSuite extends QueryTest with 
SharedSQLContext {
     import testImplicits._
     Seq(1.0, 0.5).foreach { compressionFactor =>
       withSQLConf("spark.sql.sources.fileCompressionFactor" -> 
compressionFactor.toString,
-        "spark.sql.autoBroadcastJoinThreshold" -> "400") {
+        "spark.sql.autoBroadcastJoinThreshold" -> "434") {
         withTempPath { workDir =>
           // the file size is 740 bytes
           val workDirPath = workDir.getAbsolutePath
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index b6bb1d7..70079f0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.orc
 
 import java.io.File
+import java.nio.charset.StandardCharsets.UTF_8
 import java.sql.Timestamp
 import java.util.Locale
 
@@ -29,7 +30,8 @@ import org.apache.orc.OrcProto.Stream.Kind
 import org.apache.orc.impl.RecordReaderImpl
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.Row
+import org.apache.spark.SPARK_VERSION_SHORT
+import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
@@ -243,6 +245,22 @@ abstract class OrcSuite extends OrcTest with 
BeforeAndAfterAll {
       checkAnswer(spark.read.orc(path.getCanonicalPath), Row(ts))
     }
   }
+
+  test("Write Spark version into ORC file metadata") {
+    withTempPath { path =>
+      spark.range(1).repartition(1).write.orc(path.getCanonicalPath)
+
+      val partFiles = path.listFiles()
+        .filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
+      assert(partFiles.length === 1)
+
+      val orcFilePath = new Path(partFiles.head.getAbsolutePath)
+      val readerOptions = OrcFile.readerOptions(new Configuration())
+      val reader = OrcFile.createReader(orcFilePath, readerOptions)
+      val version = 
UTF_8.decode(reader.getMetadataValue(SPARK_VERSION_METADATA_KEY)).toString
+      assert(version === SPARK_VERSION_SHORT)
+    }
+  }
 }
 
 class OrcSourceSuite extends OrcSuite with SharedSQLContext {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 002c42f..6b05b9c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -27,6 +27,7 @@ import scala.reflect.runtime.universe.TypeTag
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
+import org.apache.parquet.HadoopReadOptions
 import org.apache.parquet.column.{Encoding, ParquetProperties}
 import org.apache.parquet.example.data.{Group, GroupWriter}
 import org.apache.parquet.example.data.simple.SimpleGroup
@@ -34,10 +35,11 @@ import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.apache.parquet.hadoop.util.HadoopInputFile
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
@@ -799,6 +801,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       checkAnswer(spark.read.parquet(file.getAbsolutePath), Seq(Row(Row(1, 
null, "foo"))))
     }
   }
+
+  test("Write Spark version into Parquet metadata") {
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      spark.range(1).repartition(1).write.parquet(path)
+      val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+
+      val conf = new Configuration()
+      val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
+      val parquetReadOptions = HadoopReadOptions.builder(conf).build()
+      val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions)
+      val metaData = m.getFileMetaData.getKeyValueMetaData
+      m.close()
+
+      assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index de8085f..8cf344b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -18,9 +18,11 @@
 package org.apache.spark.sql.hive.orc
 
 import java.net.URI
+import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Properties
 
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -34,7 +36,9 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.orc.OrcConf.COMPRESS
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SPARK_VERSION_SHORT, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -275,12 +279,14 @@ private[orc] class OrcOutputWriter(
 
   override def close(): Unit = {
     if (recordWriterInstantiated) {
+      // Hive 1.2.1 ORC initializes its private `writer` field at the first 
write.
+      OrcFileFormat.addSparkVersionMetadata(recordWriter)
       recordWriter.close(Reporter.NULL)
     }
   }
 }
 
-private[orc] object OrcFileFormat extends HiveInspectors {
+private[orc] object OrcFileFormat extends HiveInspectors with Logging {
   // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is 
unfortunately not public.
   private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
 
@@ -340,4 +346,18 @@ private[orc] object OrcFileFormat extends HiveInspectors {
     val (sortedIDs, sortedNames) = 
ids.zip(requestedSchema.fieldNames).sorted.unzip
     HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
   }
+
+  /**
+   * Add a metadata specifying Spark version.
+   */
+  def addSparkVersionMetadata(recordWriter: RecordWriter[NullWritable, 
Writable]): Unit = {
+    try {
+      val writerField = recordWriter.getClass.getDeclaredField("writer")
+      writerField.setAccessible(true)
+      val writer = writerField.get(recordWriter).asInstanceOf[Writer]
+      writer.addUserMetadata(SPARK_VERSION_METADATA_KEY, 
UTF_8.encode(SPARK_VERSION_SHORT))
+    } catch {
+      case NonFatal(e) => log.warn(e.toString, e)
+    }
+  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index c3a8f90..da79bd1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1355,23 +1355,23 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
           // analyze table
           sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS NOSCAN")
           var tableStats = getTableStats(tblName)
-          assert(tableStats.sizeInBytes == 567)
+          assert(tableStats.sizeInBytes == 601)
           assert(tableStats.rowCount.isEmpty)
 
           sql(s"ANALYZE TABLE $tblName COMPUTE STATISTICS")
           tableStats = getTableStats(tblName)
-          assert(tableStats.sizeInBytes == 567)
+          assert(tableStats.sizeInBytes == 601)
           assert(tableStats.rowCount.get == 1)
 
           // analyze a single partition
           sql(s"ANALYZE TABLE $tblName PARTITION (ds='2019-12-13') COMPUTE 
STATISTICS NOSCAN")
           var partStats = getPartitionStats(tblName, Map("ds" -> "2019-12-13"))
-          assert(partStats.sizeInBytes == 567)
+          assert(partStats.sizeInBytes == 601)
           assert(partStats.rowCount.isEmpty)
 
           sql(s"ANALYZE TABLE $tblName PARTITION (ds='2019-12-13') COMPUTE 
STATISTICS")
           partStats = getPartitionStats(tblName, Map("ds" -> "2019-12-13"))
-          assert(partStats.sizeInBytes == 567)
+          assert(partStats.sizeInBytes == 601)
           assert(partStats.rowCount.get == 1)
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to