danny0405 commented on code in PR #7702:
URL: https://github.com/apache/hudi/pull/7702#discussion_r1080809686
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java:
##########
@@ -81,7 +83,8 @@ public Class<?>[] registerClasses() {
HoodieRecordLocation.class,
HoodieRecordGlobalLocation.class
- };
+ })
+ .forEachOrdered(kryo::register);
Review Comment:
A stateless function (function that does not take any side effect) is always
a better choice especially for tool method, personally I prefer the old way we
handle this.
##########
pom.xml:
##########
@@ -440,34 +441,13 @@
<!-- common to all bundles -->
<artifactSet>
<includes>
- <!-- com.esotericsoftware:kryo-shaded -->
- <include>com.esotericsoftware:kryo-shaded</include>
- <include>com.esotericsoftware:minlog</include>
- <include>org.objenesis:objenesis</include>
<!-- org.apache.httpcomponents -->
<include>org.apache.httpcomponents:httpclient</include>
<include>org.apache.httpcomponents:httpcore</include>
<include>org.apache.httpcomponents:fluent-hc</include>
</includes>
</artifactSet>
<relocations>
- <!-- com.esotericsoftware:kryo-shaded -->
- <relocation>
- <pattern>com.esotericsoftware.kryo.</pattern>
-
<shadedPattern>org.apache.hudi.com.esotericsoftware.kryo.</shadedPattern>
- </relocation>
Review Comment:
What is the purpose to move the common bundle dependencies to each bundle
pom files?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java:
##########
@@ -92,10 +93,18 @@ public class HoodieClientTestUtils {
*/
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf().setAppName(appName)
- .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer").setMaster("local[4]")
+ .setMaster("local[4]")
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
+ .set("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
Review Comment:
Can we also move these common options into a tool method?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala:
##########
@@ -51,13 +52,6 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
DateTimeZone.setDefault(DateTimeZone.UTC)
TimeZone.setDefault(DateTimeUtils.getTimeZone("UTC"))
protected lazy val spark: SparkSession = SparkSession.builder()
- .master("local[1]")
- .appName("hoodie sql test")
- .withExtensions(new HoodieSparkSessionExtension)
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .config("hoodie.insert.shuffle.parallelism", "4")
- .config("hoodie.upsert.shuffle.parallelism", "4")
- .config("hoodie.delete.shuffle.parallelism", "4")
Review Comment:
Not sure whether we can remove these parallelism options.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala:
##########
@@ -42,22 +43,31 @@ import org.apache.spark.serializer.KryoRegistrator
* or renamed (w/o correspondingly updating such usages)</li>
* </ol>
*/
-class HoodieSparkKryoProvider extends HoodieCommonKryoProvider {
- override def registerClasses(): Array[Class[_]] = {
+class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with
KryoRegistrator {
+ override def registerClasses(kryo: Kryo): Unit = {
///////////////////////////////////////////////////////////////////////////
// NOTE: DO NOT REORDER REGISTRATIONS
///////////////////////////////////////////////////////////////////////////
- val classes = super[HoodieCommonKryoProvider].registerClasses()
- classes ++ Array(
- classOf[HoodieWriteConfig],
- classOf[HoodieSparkRecord],
- classOf[HoodieInternalRow]
- )
+ super[HoodieCommonKryoRegistrar].registerClasses(kryo)
+
+ kryo.register(classOf[HoodieWriteConfig])
+
+ kryo.register(classOf[HoodieSparkRecord])
+ kryo.register(classOf[HoodieInternalRow])
+
+ // NOTE: Hadoop's configuration is not a serializable object by itself,
and hence
+ // we're relying on [[SerializableConfiguration]] wrapper to work it
around
+ kryo.register(classOf[SerializableConfiguration], new JavaSerializer())
}
}
-object HoodieSparkKryoProvider {
+object HoodieSparkKryoRegistrar {
+
+ // NOTE: We're copying definition of the config introduced in Spark 3.0
+ // (to stay compatible w/ Spark 2.4)
+ private val KRYO_USER_REGISTRATORS = "spark.kryo.registrator"
+
def register(conf: SparkConf): SparkConf = {
- conf.registerKryoClasses(new HoodieSparkKryoProvider().registerClasses())
+ conf.set(KRYO_USER_REGISTRATORS,
Seq(classOf[HoodieSparkKryoRegistrar].getName).mkString(","))
}
Review Comment:
Does `.mkString(",")` make sense here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]