This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c4f748e8924 [SPARK-50112][SQL] Allowing the TransformWithState 
operator to use Avro encoding
2c4f748e8924 is described below

commit 2c4f748e892429f0575b578dbb7f9306a5d445a0
Author: Eric Marnadi <[email protected]>
AuthorDate: Tue Nov 5 15:00:20 2024 -0800

    [SPARK-50112][SQL] Allowing the TransformWithState operator to use Avro 
encoding
    
    ### What changes were proposed in this pull request?
    
    With the introduction of the TransformWithState operator, we are going to 
persist all state information in the StateStore with Avro encoding, instead of 
UnsafeRow. This is because UnsafeRow is not backwards compatible and can change 
between Spark releases, potentially causing StateStore corruptions. Avro is 
backwards-compatible and can be reliably used. This change will move the 
necessary files to the sql/core directory so Avro encoding can be used by 
stateful streaming operators.
    
    ### Why are the changes needed?
    
    To allow classes within sql/core, like the StateTypesEncoder class, to be 
able to use Avro encoding.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests are sufficient to ensure compilation
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48650 from ericm-db/moving-avro.
    
    Lead-authored-by: Eric Marnadi <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/connect/client/CheckConnectJvmClientCompatibility.scala     | 3 +++
 project/MimaExcludes.scala                                          | 3 +++
 .../main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java   | 0
 .../java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java    | 0
 .../src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala | 0
 .../src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala   | 3 +--
 .../src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala      | 0
 .../src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala | 0
 .../scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala   | 0
 .../src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala   | 6 ++----
 .../core}/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala  | 0
 .../src/main/scala/org/apache/spark/sql/avro/CustomDecimal.scala    | 3 +--
 .../src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala | 5 ++---
 13 files changed, 12 insertions(+), 11 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index d176cb3d0a44..ff3e674137a5 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -153,6 +153,9 @@ object CheckConnectJvmClientCompatibility {
       // Filter unsupported rules:
       // Note when muting errors for a method, checks on all overloading 
methods are also muted.
 
+      // Skip any avro files
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.avro.*"),
+
       // Skip unsupported packages
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.api.*"), // Java, 
Python, R
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.catalyst.*"),
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index f31a29788aaf..1c3e2f16cb0f 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -198,6 +198,9 @@ object MimaExcludes {
 
     // SPARK-49748: Add getCondition and deprecate getErrorClass in 
SparkThrowable
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.SparkThrowable.getCondition"),
+
+    // SPARK-50112: Moving avro files from connector to sql/core
+    ProblemFilters.exclude[Problem]("org.apache.spark.sql.avro.*"),
   ) ++ loggingExcludes("org.apache.spark.sql.DataFrameReader") ++
     loggingExcludes("org.apache.spark.sql.streaming.DataStreamReader") ++
     loggingExcludes("org.apache.spark.sql.SparkSession#Builder")
diff --git 
a/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
 b/sql/core/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
similarity index 100%
rename from 
connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
rename to 
sql/core/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
diff --git 
a/connector/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
 
b/sql/core/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
similarity index 100%
rename from 
connector/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
rename to 
sql/core/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
similarity index 100%
rename from 
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
similarity index 98%
rename from 
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 264c3a1f48ab..3e1aa11b52b3 100755
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -21,8 +21,7 @@ import java.io._
 
 import scala.util.control.NonFatal
 
-import org.apache.avro.{LogicalTypes, Schema}
-import org.apache.avro.LogicalType
+import org.apache.avro.{LogicalType, LogicalTypes, Schema}
 import org.apache.avro.file.DataFileReader
 import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
 import org.apache.avro.mapred.FsInput
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
similarity index 100%
rename from 
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
 b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
similarity index 100%
rename from 
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
similarity index 100%
rename from 
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
similarity index 98%
rename from 
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 1d9eada94658..814a28e24f52 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -21,14 +21,12 @@ import java.nio.ByteBuffer
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Conversions.DecimalConversion
-import org.apache.avro.LogicalTypes
 import org.apache.avro.LogicalTypes.{LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
-import org.apache.avro.Schema
 import org.apache.avro.Schema.Type
 import org.apache.avro.Schema.Type._
-import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed, Record}
 import org.apache.avro.util.Utf8
 
 import org.apache.spark.internal.Logging
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
similarity index 100%
rename from 
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/CustomDecimal.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/avro/CustomDecimal.scala
similarity index 97%
rename from 
connector/avro/src/main/scala/org/apache/spark/sql/avro/CustomDecimal.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/avro/CustomDecimal.scala
index fab3d4493e34..a5700a048153 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/CustomDecimal.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/CustomDecimal.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.sql.avro
 
-import org.apache.avro.LogicalType
-import org.apache.avro.Schema
+import org.apache.avro.{LogicalType, Schema}
 
 import org.apache.spark.sql.types.DecimalType
 
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
 b/sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
similarity index 98%
rename from 
connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 1168a887abd8..495fc011df46 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -23,13 +23,12 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
-import org.apache.avro.LogicalTypes.{Date, Decimal, LocalTimestampMicros, 
LocalTimestampMillis, TimestampMicros, TimestampMillis}
+import org.apache.avro.LogicalTypes.{Decimal, _}
 import org.apache.avro.Schema.Type._
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys.{FIELD_NAME, FIELD_TYPE, 
RECURSIVE_DEPTH}
-import org.apache.spark.internal.MDC
 import org.apache.spark.sql.avro.AvroOptions.RECURSIVE_FIELD_MAX_DEPTH_LIMIT
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.types._


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to