lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1114062796
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -116,10 +116,74 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport {
}, SQLConf.get)
}
- def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
- sparkAdapter.createSparkRowSerDe(structType)
- }
-
private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
new SQLConfInjectingRDD(rdd, conf)
+
+ def safeCreateRDD(df: DataFrame, structName: String, recordNamespace:
String, reconcileToLatestSchema: Boolean,
+ latestTableSchema:
org.apache.hudi.common.util.Option[Schema] =
org.apache.hudi.common.util.Option.empty()):
+ Tuple2[RDD[GenericRecord], RDD[String]] = {
+ var latestTableSchemaConverted: Option[Schema] = None
+
+ if (latestTableSchema.isPresent && reconcileToLatestSchema) {
+ latestTableSchemaConverted = Some(latestTableSchema.get())
+ } else {
+ // cases when users want to use latestTableSchema but have not turned on
reconcileToLatestSchema explicitly
+ // for example, when using a Transformer implementation to transform
source RDD to target RDD
+ latestTableSchemaConverted = if (latestTableSchema.isPresent)
Some(latestTableSchema.get()) else None
+ }
+ safeCreateRDD(df, structName, recordNamespace, latestTableSchemaConverted);
+ }
+
+ def safeCreateRDD(df: DataFrame, structName: String, recordNamespace:
String, readerAvroSchemaOpt: Option[Schema]):
+ Tuple2[RDD[GenericRecord], RDD[String]] = {
+ val writerSchema = df.schema
+ val writerAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName,
recordNamespace)
+ val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+ // We check whether passed in reader schema is identical to writer schema
to avoid costly serde loop of
+ // making Spark deserialize its internal representation [[InternalRow]]
into [[Row]] for subsequent conversion
+ // (and back)
+ val sameSchema = writerAvroSchema.equals(readerAvroSchema)
+ val (nullable, _) =
AvroConversionUtils.resolveAvroTypeNullability(writerAvroSchema)
+
+ // NOTE: We have to serialize Avro schema, and then subsequently parse it
on the executor node, since Spark
+ // serializer is not able to digest it
+ val writerAvroSchemaStr = writerAvroSchema.toString
+ // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to
[[Row]] conversion
+
+ if (!sameSchema) {
+ val rdds: RDD[Either[GenericRecord, String]] =
df.queryExecution.toRdd.mapPartitions { rows =>
+ if (rows.isEmpty) {
+ Iterator.empty
+ } else {
+ val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
+ val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerAvroSchema, nullable = nullable)
+ val rowDeserializer = getCatalystRowSerDe(writerSchema)
+ val transform: InternalRow => Either[GenericRecord, String] =
internalRow => try {
+ Left(HoodieAvroUtils.rewriteRecordDeep(convert(internalRow),
readerAvroSchema, true))
+ } catch {
+ case _: Throwable =>
+ val rdd =
df.sparkSession.sparkContext.parallelize(Seq(rowDeserializer.deserializeRow(internalRow)))
+ Right(df.sqlContext.createDataFrame(rdd,
writerSchema).toJSON.first())
Review Comment:
Is this the right way to create a json from a Row? Row provides json()
function but that is available from version 3.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]