yihua commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r813265854



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -69,7 +67,7 @@ protected BuiltinKeyGenerator(TypedProperties config) {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
     if (null == converterFn) {
-      converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
+      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
     }
     GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
     return getKey(genericRecord).getRecordKey();

Review comment:
       nit: no need for type cast and directly do `return 
getKey(converterFn.apply(row)).getRecordKey();`?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java
##########
@@ -84,7 +82,7 @@ public String getRecordKey(Row row) {
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getPartitionPath(Row row) {
     if (null == converterFn) {
-      converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
+      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), 
STRUCT_NAME, NAMESPACE);
     }
     GenericRecord genericRecord = (GenericRecord) converterFn.apply(row);
     return getKey(genericRecord).getPartitionPath();

Review comment:
       same here

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -323,32 +322,60 @@ private object HoodieMergeOnReadRDD {
 
   def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
     val fs = FSUtils.getFs(split.tablePath, config)
-    val partitionPath: String = if (split.logPaths.isEmpty || 
split.logPaths.get.asJava.isEmpty) {
-      null
+    val logFiles = split.logFiles.get
+
+    if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {

Review comment:
       I need some clarification here.  So why is metadata table treated 
differently here?  It's basically another Hudi table.

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -41,15 +54,105 @@ abstract class HoodieBaseRelation(
 
   protected val sparkSession: SparkSession = sqlContext.sparkSession
 
-  protected val tableAvroSchema: Schema = {
+  protected lazy val tableAvroSchema: Schema = {
     val schemaUtil = new TableSchemaResolver(metaClient)
-    Try 
(schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema.get))
+    Try(schemaUtil.getTableAvroSchema).getOrElse(
+      // If there is no commit in the table, we can't get the schema
+      // t/h [[TableSchemaResolver]], fallback to provided the [[userSchema]] 
instead.

Review comment:
       nit: typo  "fallback to provided the" -> "fallback to the provided"

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -41,8 +107,8 @@ object AvroConversionUtils {
         else {
           val schema = new Schema.Parser().parse(schemaStr)
           val dataType = convertAvroSchemaToStructType(schema)
-          val convertor = AvroConversionHelper.createConverterToRow(schema, 
dataType)
-          records.map { x => convertor(x).asInstanceOf[Row] }
+          val converter = createConverterToRow(schema, dataType)

Review comment:
       @nsivabalan @alexeykudinkin @xushiyan This is more like an internal 
conversion.  And looks like it is only used by read path?  So does this expose 
`InternalRow` in some way to public APIs and data written, i.e., if 
`InternalRow` changes in Spark, could it break Hudi read/write flows?
   
   End-to-end wise for write side, I think eventually we want to avoid Row/Avro 
conversion for Spark Row Writer path.  On the write side, I don't think it's a 
good idea to depend on `InternalRow` which is Spark SQL's internal abstraction, 
i.e., on the write side `Row`s should be passed along e2e when row abstraction 
is required.
   
   cc @vinothchandar @bvaradar 

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -18,20 +18,105 @@
 
 package org.apache.hudi
 
-import org.apache.avro.Schema
-import org.apache.avro.JsonProperties
+import org.apache.avro.Schema.Type
 import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
+import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.avro.SchemaConverters
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
-import scala.collection.JavaConverters._
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object AvroConversionUtils {
 
+  /**
+   * Check the nullability of the input Avro type and resolve it when it is 
nullable. The first
+   * return value is a [[Boolean]] indicating if the input Avro type is 
nullable. The second
+   * return value is either provided Avro type if it's not nullable, or its 
resolved non-nullable part
+   * in case it is
+   */
+  def resolveAvroTypeNullability(avroType: Schema): (Boolean, Schema) = {
+    if (avroType.getType == Type.UNION) {
+      val fields = avroType.getTypes.asScala
+      val actualType = fields.filter(_.getType != Type.NULL)
+      if (fields.length != 2 || actualType.length != 1) {
+        throw new AvroRuntimeException(
+          s"Unsupported Avro UNION type $avroType: Only UNION of a null type 
and a non-null " +
+            "type is supported")
+      }
+      (true, actualType.head)
+    } else {
+      (false, avroType)
+    }
+  }
+
+  /**
+   * Creates converter to transform Avro payload into Spark's Catalyst one
+   *
+   * @param rootAvroType Avro [[Schema]] to be transformed from
+   * @param rootCatalystType Catalyst [[StructType]] to be transformed into
+   * @return converter accepting Avro payload and transforming it into a 
Catalyst one (in the form of [[InternalRow]])
+   */
+  def createAvroToInternalRowConverter(rootAvroType: Schema, rootCatalystType: 
StructType): GenericRecord => Option[InternalRow] =
+    record => sparkAdapter.createAvroDeserializer(rootAvroType, 
rootCatalystType)
+      .deserialize(record)
+      .map(_.asInstanceOf[InternalRow])
+
+  /**
+   * Creates converter to transform Catalyst payload into Avro one
+   *
+   * @param rootCatalystType Catalyst [[StructType]] to be transformed from
+   * @param rootAvroType Avro [[Schema]] to be transformed into
+   * @param nullable whether Avro record is nullable
+   * @return converter accepting Catalyst payload (in the form of 
[[InternalRow]]) and transforming it into an Avro one
+   */
+  def createInternalRowToAvroConverter(rootCatalystType: StructType, 
rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = {
+    row => sparkAdapter.createAvroSerializer(rootCatalystType, rootAvroType, 
nullable)
+      .serialize(row)
+      .asInstanceOf[GenericRecord]
+  }
+
+  /**
+   * @deprecated please use 
[[AvroConversionUtils.createInternalRowToAvroConverter]]

Review comment:
       wrong ref to the new API?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to