zhedoubushishi commented on a change in pull request #2208:
URL: https://github.com/apache/hudi/pull/2208#discussion_r517548998
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -24,35 +24,13 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.HoodieKey
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.SchemaConverters
-import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import scala.collection.JavaConverters._
object AvroConversionUtils {
- def createRdd(df: DataFrame, structName: String, recordNamespace: String):
RDD[GenericRecord] = {
- val avroSchema = convertStructTypeToAvroSchema(df.schema, structName,
recordNamespace)
- createRdd(df, avroSchema, structName, recordNamespace)
- }
-
- def createRdd(df: DataFrame, avroSchema: Schema, structName: String,
recordNamespace: String)
- : RDD[GenericRecord] = {
- // Use the Avro schema to derive the StructType which has the correct
nullability information
- val dataType =
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
- val encoder = RowEncoder.apply(dataType).resolveAndBind()
- val deserializer = HoodieSparkUtils.createDeserializer(encoder)
- df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
- .mapPartitions { records =>
- if (records.isEmpty) Iterator.empty
- else {
- val convertor = AvroConversionHelper.createConverterToAvro(dataType,
structName, recordNamespace)
- records.map { x => convertor(x).asInstanceOf[GenericRecord] }
- }
- }
- }
-
def createRddForDeletes(df: DataFrame, rowField: String, partitionField:
String): RDD[HoodieKey] = {
Review comment:
Good point. It seems ```createRddForDeletes``` is never used in Hudi,
I'll just remove it.
```createDataFrame``` is tricky because it is used in
```hudi-spark-client``` so I cannot move it to ```hudi-spark``` module.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]