vinothchandar commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r683796125



##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, 
upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, 
recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): 
RDD[GenericRecord] = {

Review comment:
       you can just use default values, without having to create method 
overloads in scala.

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, 
upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, 
recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): 
RDD[GenericRecord] = {
     val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
-    createRdd(df, avroSchema, structName, recordNamespace)
+    // if upgradeToLatestSchemaIfNeeded is set to true and latestSchema is not 
null, then try to leverage latestSchema

Review comment:
       can we just determine the latestSchema here with something like and make 
one call to `createRdd` 
   
   ```
   val reconciledSchema /* or something like that */ = if (...) else ..  
   ```

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -414,6 +414,13 @@ public static void 
initializeBootstrapDirsIfNotExists(Configuration hadoopConf,
     return fs.listStatus(metaPath, nameFilter);
   }
 
+  /**
+   * @return {@code true} if any commits are found, else {@code false}.
+   */
+  public boolean isTimelineNonEmpty() {
+    return 
getCommitsTimeline().getInstants().collect(Collectors.toList()).size() > 0;

Review comment:
       double check : that this just returns completed instants.

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -286,6 +286,16 @@ object DataSourceWriteOptions {
     .defaultValue(classOf[HiveSyncTool].getName)
     .withDocumentation("Sync tool class name used to sync to metastore. 
Defaults to Hive.")
 
+  val HANDLE_SCHEMA_MISMATCH: ConfigProperty[Boolean] = ConfigProperty
+    .key("hoodie.datasource.write.handle.schema.mismatch")

Review comment:
       lets make this consist with every boolean flag used to denote this. So 
either this becoms `hoodie.datasource.write.reconcile.schema` or the variable 
is `handleSchemaMismatch` 

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
##########
@@ -40,8 +40,8 @@ public SparkAvroPostProcessor(TypedProperties props, 
JavaSparkContext jssc) {
 
   @Override
   public Schema processSchema(Schema schema) {
-    return AvroConversionUtils.convertStructTypeToAvroSchema(
+    return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(

Review comment:
       null?

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -391,11 +395,14 @@ public void refreshTimeline() throws IOException {
             transformed
                 .map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, 
jssc,
                     dataAndCheckpoint.getSchemaProvider(),
-                    UtilHelpers.createRowBasedSchemaProvider(r.schema(), 
props, jssc)))
+                    handleSchemaMismatch

Review comment:
       please pull the schema determination into a separate variable above
   

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, 
upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, 
recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): 
RDD[GenericRecord] = {
     val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)

Review comment:
       you probably want to rename this to sth like `writeSchema` - given this 
is the schema of the incoming write. and latestSchema is probably just the 
`latestTableSchema` right

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, 
upgradeToLatestSchemaIfNeeded)

Review comment:
       `null` as sentinel?

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, 
upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, 
recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): 
RDD[GenericRecord] = {
     val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
-    createRdd(df, avroSchema, structName, recordNamespace)
+    // if upgradeToLatestSchemaIfNeeded is set to true and latestSchema is not 
null, then try to leverage latestSchema
+    // this code path will handle situations where records are serialized in 
schema1, but callers wish to convert to
+    // Rdd[GenericRecord] using different schema(could be evolved schema or 
could be latest table schema)
+    if (upgradeToLatestSchemaIfNeeded && latestSchema != null) {
+      createRdd(df, avroSchema, latestSchema, structName, recordNamespace)
+    } else {
+      // there are paths where callers wish to use latestSchema to convert to 
Rdd[GenericRecords] and not use row's schema
+      // So use latestSchema is not null. if not available, fallback to using 
row's schema.
+      createRdd(df, if(latestSchema != null) latestSchema else avroSchema, 
null, structName, recordNamespace)
+    }
   }
 
-  def createRdd(df: DataFrame, avroSchema: Schema, structName: String, 
recordNamespace: String)
+  def createRdd(df: DataFrame, avroSchema: Schema, latestSchema: Schema, 
structName: String, recordNamespace: String)
   : RDD[GenericRecord] = {
     // Use the Avro schema to derive the StructType which has the correct 
nullability information
     val dataType = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
     val encoder = RowEncoder.apply(dataType).resolveAndBind()
     val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
+    // if records were serialized with old schema, but an evolved schema was 
passed in with latestSchema, we need
+    // latestSchema equivalent datatype to be passed in to 
AvroConversionHelper.createConverterToAvro()
+    val latestDataType =
+      if (latestSchema != null) 
SchemaConverters.toSqlType(latestSchema).dataType.asInstanceOf[StructType] else 
null
+    // Note: deserializer.deserializeRow(row) is not capable of handling 
evolved schema. i.e. if Row was serialized in
+    // old schema, but deserializer was created with an encoder with evolved 
schema, deserialization fails.
+    // Hence we always need to deserialize in the same schema as serialized 
schema.
     df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
       .mapPartitions { records =>
         if (records.isEmpty) Iterator.empty
         else {
-          val convertor = AvroConversionHelper.createConverterToAvro(dataType, 
structName, recordNamespace)
+          // if records were serialized with old schema, but an evolved schema 
was passed in with latestSchema, we need
+          // latestSchema equivalent datatype to be passed in to 
AvroConversionHelper.createConverterToAvro()
+          val convertor = AvroConversionHelper.createConverterToAvro(
+            if (latestDataType != null) latestDataType else dataType, 
structName, recordNamespace)

Review comment:
       same naming terminology? writeStructType, latestTableStructType

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
##########
@@ -425,11 +429,53 @@ public static SchemaProviderWithPostProcessor 
wrapSchemaProviderWithPostProcesso
   }
 
   public static SchemaProvider createRowBasedSchemaProvider(StructType 
structType,
-      TypedProperties cfg, JavaSparkContext jssc) {
+                                                            TypedProperties 
cfg, JavaSparkContext jssc) {
     SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
     return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, 
null);
   }
 
+  /**
+   * Create latest schema provider for Target schema.
+   *
+   * @param structType spark data type of incoming batch.
+   * @param jssc       instance of {@link JavaSparkContext}.
+   * @param fs         instance of {@link FileSystem}.
+   * @param basePath   base path of the table.
+   * @return the schema provider where target schema refers to latest 
schema(either incoming schema or table schema).
+   */
+  public static SchemaProvider createLatestSchemaProvider(StructType 
structType,
+                                                          JavaSparkContext 
jssc, FileSystem fs, String basePath) {
+    SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
+    Schema incomingSchema = rowSchemaProvider.getTargetSchema();
+    Schema latestSchema = incomingSchema;
+
+    try {
+      if (FSUtils.isTableExists(basePath, fs)) {
+        HoodieTableMetaClient tableMetaClient = 
HoodieTableMetaClient.builder().setConf(jssc.sc().hadoopConfiguration()).setBasePath(basePath).build();
+        TableSchemaResolver
+            tableSchemaResolver = new TableSchemaResolver(tableMetaClient);
+        latestSchema = tableSchemaResolver.getLatestSchema(incomingSchema, 
true, (Function1<Schema, Schema>) v1 -> 
AvroConversionUtils.convertStructTypeToAvroSchema(
+            AvroConversionUtils.convertAvroSchemaToStructType(v1), 
RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
+            RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE));
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not fetch table schema. Falling back to writer schema");
+    }
+
+    final Schema finalLatestSchema = latestSchema;
+    return new SchemaProvider(null) {

Review comment:
       null?

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -92,22 +92,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, 
upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, 
recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): 
RDD[GenericRecord] = {

Review comment:
       rename `upgrade` to `reconcile`  and drop `IfNeeded`. just 
`reconcileToLatestSchema`

##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
##########
@@ -425,11 +429,53 @@ public static SchemaProviderWithPostProcessor 
wrapSchemaProviderWithPostProcesso
   }
 
   public static SchemaProvider createRowBasedSchemaProvider(StructType 
structType,
-      TypedProperties cfg, JavaSparkContext jssc) {
+                                                            TypedProperties 
cfg, JavaSparkContext jssc) {
     SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
     return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, 
null);
   }
 
+  /**
+   * Create latest schema provider for Target schema.
+   *
+   * @param structType spark data type of incoming batch.
+   * @param jssc       instance of {@link JavaSparkContext}.
+   * @param fs         instance of {@link FileSystem}.
+   * @param basePath   base path of the table.
+   * @return the schema provider where target schema refers to latest 
schema(either incoming schema or table schema).
+   */
+  public static SchemaProvider createLatestSchemaProvider(StructType 
structType,
+                                                          JavaSparkContext 
jssc, FileSystem fs, String basePath) {
+    SchemaProvider rowSchemaProvider = new RowBasedSchemaProvider(structType);
+    Schema incomingSchema = rowSchemaProvider.getTargetSchema();

Review comment:
       writeSchema/tableSchema?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to