This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a64ccc777f3 [HUDI-6240] Adding default value as CORRECTED for rebase
modes in write and read for avro (#8764)
a64ccc777f3 is described below
commit a64ccc777f36f57f759081172c6eda83aee11757
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat May 20 12:41:26 2023 -0400
[HUDI-6240] Adding default value as CORRECTED for rebase modes in write and
read for avro (#8764)
---------
Co-authored-by: Raymond Xu <[email protected]>
---
.../src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala | 3 ++-
.../src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala | 2 +-
.../org/apache/spark/sql/avro/HoodieSpark3_1AvroDeserializer.scala | 3 ++-
.../src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala | 3 ++-
.../org/apache/spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala | 3 ++-
.../src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala | 3 ++-
.../org/apache/spark/sql/avro/HoodieSpark3_3AvroDeserializer.scala | 3 ++-
7 files changed, 13 insertions(+), 7 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 5fb6d907bdc..35b4cb9f3b5 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -57,7 +57,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
this(
rootAvroType,
rootCatalystType,
-
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ)),
+
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ,
+ LegacyBehaviorPolicy.CORRECTED.toString)),
new NoopFilters)
}
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 36d86c1e01f..e35b9271012 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -54,7 +54,7 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
def this(rootCatalystType: DataType, rootAvroType: Schema, nullable:
Boolean) = {
this(rootCatalystType, rootAvroType, nullable,
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(
- SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE)))
+ SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE,
LegacyBehaviorPolicy.CORRECTED.toString)))
}
def serialize(catalystData: Any): Any = {
diff --git
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_1AvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_1AvroDeserializer.scala
index 7de9a8b28d0..63f5f9407e7 100644
---
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_1AvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_1AvroDeserializer.scala
@@ -27,7 +27,8 @@ class HoodieSpark3_1AvroDeserializer(rootAvroType: Schema,
rootCatalystType: Dat
extends HoodieAvroDeserializer {
private val avroDeserializer = {
- val avroRebaseModeInRead =
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ))
+ val avroRebaseModeInRead = LegacyBehaviorPolicy
+ .withName(SQLConf.get.getConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ,
LegacyBehaviorPolicy.CORRECTED.toString))
new AvroDeserializer(rootAvroType, rootCatalystType, avroRebaseModeInRead,
new NoopFilters)
}
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index ba9812b0267..6a0323cacff 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -63,7 +63,8 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
def this(rootCatalystType: DataType, rootAvroType: Schema, nullable:
Boolean) = {
this(rootCatalystType, rootAvroType, nullable, positionalFieldMatch =
false,
-
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
+
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE,
+ LegacyBehaviorPolicy.CORRECTED.toString)))
}
def serialize(catalystData: Any): Any = {
diff --git
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala
index d839c73032c..d85bda6ca30 100644
---
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_2AvroDeserializer.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.avro
import org.apache.avro.Schema
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types.DataType
class HoodieSpark3_2AvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType)
extends HoodieAvroDeserializer {
private val avroDeserializer = new AvroDeserializer(rootAvroType,
rootCatalystType,
- SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
+ SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ,
LegacyBehaviorPolicy.CORRECTED.toString))
def deserialize(data: Any): Option[Any] = avroDeserializer.deserialize(data)
}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 450d9d73465..277b8d0528a 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -62,7 +62,8 @@ private[sql] class AvroSerializer(rootCatalystType: DataType,
def this(rootCatalystType: DataType, rootAvroType: Schema, nullable:
Boolean) = {
this(rootCatalystType, rootAvroType, nullable, positionalFieldMatch =
false,
-
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE)))
+
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_WRITE,
+ LegacyBehaviorPolicy.CORRECTED.toString)))
}
def serialize(catalystData: Any): Any = {
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_3AvroDeserializer.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_3AvroDeserializer.scala
index 2a0bfaf0d10..be9ec205082 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_3AvroDeserializer.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3_3AvroDeserializer.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.avro
import org.apache.avro.Schema
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.types.DataType
class HoodieSpark3_3AvroDeserializer(rootAvroType: Schema, rootCatalystType:
DataType)
extends HoodieAvroDeserializer {
private val avroDeserializer = new AvroDeserializer(rootAvroType,
rootCatalystType,
- SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ))
+ SQLConf.get.getConf(SQLConf.AVRO_REBASE_MODE_IN_READ,
LegacyBehaviorPolicy.CORRECTED.toString))
def deserialize(data: Any): Option[Any] = avroDeserializer.deserialize(data)
}