This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 421f1cd6b75b3a9599edd0953273f606ac112e3d
Author: xiarixiaoyao <[email protected]>
AuthorDate: Wed Dec 7 10:38:54 2022 +0800

    [HUDI-5294] Support type change for schema on read + reconcile schema 
(#7326)
    
    * [HUDI-5294] Support type change for schema on read + reconcile schema
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   2 +-
 .../hudi/common/util/InternalSchemaCache.java      |  26 ++++-
 .../schema/utils/AvroSchemaEvolutionUtils.java     |  33 ++++---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  32 +++++--
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  | 106 ++++++++++++++++++++-
 5 files changed, 172 insertions(+), 27 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 609f85e27fe..4cec5d1b0c3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -293,7 +293,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
       InternalSchema internalSchema;
       Schema avroSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), 
config.allowOperationMetadataField());
       if (historySchemaStr.isEmpty()) {
-        internalSchema = AvroInternalSchemaConverter.convert(avroSchema);
+        internalSchema = 
SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema));
         internalSchema.setSchemaId(Long.parseLong(instantTime));
       } else {
         internalSchema = 
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
index 846309b7b67..5f5a8763409 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.avro.Schema;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -26,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
@@ -166,9 +169,11 @@ public class InternalSchemaCache {
    * step1:
    * try to parser internalSchema from HoodieInstant directly
    * step2:
-   * if we cannot parser internalSchema in step1,
+   * if we cannot parser internalSchema in step1, (eg: current versionId 
HoodieInstant has been archived)
    * try to find internalSchema in historySchema.
-   *
+   * step3:
+   * if we cannot parser internalSchema in step2  (eg: schema evolution is not 
enabled when we create hoodie table, however after some inserts we enable 
schema evolution)
+   * try to convert table schema to internalSchema.
    * @param versionId the internalSchema version to be search.
    * @param tablePath table path
    * @param hadoopConf conf
@@ -176,6 +181,7 @@ public class InternalSchemaCache {
    * @return a internalSchema.
    */
   public static InternalSchema getInternalSchemaByVersionId(long versionId, 
String tablePath, Configuration hadoopConf, String validCommits) {
+    String avroSchema = "";
     Set<String> commitSet = 
Arrays.stream(validCommits.split(",")).collect(Collectors.toSet());
     List<String> validateCommitList = commitSet.stream().map(fileName -> {
       String fileExtension = HoodieInstant.getTimelineFileExtension(fileName);
@@ -199,6 +205,7 @@ public class InternalSchemaCache {
         }
         HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
         String latestInternalSchemaStr = 
metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
+        avroSchema = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
         if (latestInternalSchemaStr != null) {
           return SerDeHelper.fromJson(latestInternalSchemaStr).orElse(null);
         }
@@ -209,8 +216,19 @@ public class InternalSchemaCache {
     }
     // step2:
     FileBasedInternalSchemaStorageManager 
fileBasedInternalSchemaStorageManager = new 
FileBasedInternalSchemaStorageManager(hadoopConf, new Path(tablePath));
-    String lastestHistorySchema = 
fileBasedInternalSchemaStorageManager.getHistorySchemaStrByGivenValidCommits(validateCommitList);
-    return InternalSchemaUtils.searchSchema(versionId, 
SerDeHelper.parseSchemas(lastestHistorySchema));
+    String latestHistorySchema = 
fileBasedInternalSchemaStorageManager.getHistorySchemaStrByGivenValidCommits(validateCommitList);
+    if (latestHistorySchema.isEmpty()) {
+      return InternalSchema.getEmptyInternalSchema();
+    }
+    InternalSchema fileSchema = InternalSchemaUtils.searchSchema(versionId, 
SerDeHelper.parseSchemas(latestHistorySchema));
+    // step3:
+    return fileSchema.isEmptySchema() ? 
AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(avroSchema))) : fileSchema;
+  }
+
+  public static InternalSchema getInternalSchemaByVersionId(long versionId, 
HoodieTableMetaClient metaClient) {
+    String validCommitLists = metaClient
+        
.getCommitsAndCompactionTimeline().filterCompletedInstants().getInstantsAsStream().map(HoodieInstant::getFileName).collect(Collectors.joining(","));
+    return getInternalSchemaByVersionId(versionId, 
metaClient.getBasePathV2().toString(), metaClient.getHadoopConf(), 
validCommitLists);
   }
 }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
index e2b33915853..2561fc507e4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
@@ -19,13 +19,11 @@
 package org.apache.hudi.internal.schema.utils;
 
 import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.internal.schema.action.TableChanges;
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 
 import org.apache.avro.Schema;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -41,7 +39,8 @@ public class AvroSchemaEvolutionUtils {
    * 2) incoming data contains new columns not defined yet in the table -> 
columns will be added to the table schema (incoming dataframe?)
    * 3) incoming data has missing columns that are already defined in the 
table and new columns not yet defined in the table ->
    *     new columns will be added to the table schema, missing columns will 
be injected with null values
-   * 4) support nested schema change.
+   * 4) support type change
+   * 5) support nested schema change.
    * Notice:
    *    the incoming schema should not have delete/rename semantics.
    *    for example: incoming schema:  int a, int b, int d;   oldTableSchema 
int a, int b, int c, int d
@@ -52,25 +51,30 @@ public class AvroSchemaEvolutionUtils {
    */
   public static InternalSchema reconcileSchema(Schema incomingSchema, 
InternalSchema oldTableSchema) {
     InternalSchema inComingInternalSchema = 
AvroInternalSchemaConverter.convert(incomingSchema);
-    // do check, only support add column evolution
+    // check column add/missing
     List<String> colNamesFromIncoming = 
inComingInternalSchema.getAllColsFullName();
     List<String> colNamesFromOldSchema = oldTableSchema.getAllColsFullName();
     List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f 
-> !colNamesFromIncoming.contains(f)).collect(Collectors.toList());
-    List<Types.Field> newFields = new ArrayList<>();
-    if (colNamesFromIncoming.size() == colNamesFromOldSchema.size() && 
diffFromOldSchema.size() == 0) {
+    List<String> diffFromEvolutionColumns = 
colNamesFromIncoming.stream().filter(f -> 
!colNamesFromOldSchema.contains(f)).collect(Collectors.toList());
+    // check type change.
+    List<String> typeChangeColumns = colNamesFromIncoming
+        .stream()
+        .filter(f -> colNamesFromOldSchema.contains(f) && 
!inComingInternalSchema.findType(f).equals(oldTableSchema.findType(f)))
+        .collect(Collectors.toList());
+    if (colNamesFromIncoming.size() == colNamesFromOldSchema.size() && 
diffFromOldSchema.size() == 0 && typeChangeColumns.isEmpty()) {
       return oldTableSchema;
     }
-    List<String> diffFromEvolutionSchema = 
colNamesFromIncoming.stream().filter(f -> 
!colNamesFromOldSchema.contains(f)).collect(Collectors.toList());
+
     // Remove redundancy from diffFromEvolutionSchema.
     // for example, now we add a struct col in evolvedSchema, the struct col 
is " user struct<name:string, age:int> "
     // when we do diff operation: user, user.name, user.age will appeared in 
the resultSet which is redundancy, user.name and user.age should be excluded.
     // deal with add operation
     TreeMap<Integer, String> finalAddAction = new TreeMap<>();
-    for (int i = 0; i < diffFromEvolutionSchema.size(); i++)  {
-      String name = diffFromEvolutionSchema.get(i);
+    for (int i = 0; i < diffFromEvolutionColumns.size(); i++)  {
+      String name = diffFromEvolutionColumns.get(i);
       int splitPoint = name.lastIndexOf(".");
       String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : "";
-      if (!parentName.isEmpty() && 
diffFromEvolutionSchema.contains(parentName)) {
+      if (!parentName.isEmpty() && 
diffFromEvolutionColumns.contains(parentName)) {
         // find redundancy, skip it
         continue;
       }
@@ -94,7 +98,14 @@ public class AvroSchemaEvolutionUtils {
       inferPosition.map(i -> addChange.addPositionChange(name, i, "before"));
     });
 
-    return SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, 
addChange);
+    // do type evolution.
+    InternalSchema internalSchemaAfterAddColumns = 
SchemaChangeUtils.applyTableChanges2Schema(oldTableSchema, addChange);
+    TableChanges.ColumnUpdateChange typeChange = 
TableChanges.ColumnUpdateChange.get(internalSchemaAfterAddColumns);
+    typeChangeColumns.stream().filter(f -> 
!inComingInternalSchema.findType(f).isNestedType()).forEach(col -> {
+      typeChange.updateColumnType(col, inComingInternalSchema.findType(col));
+    });
+
+    return 
SchemaChangeUtils.applyTableChanges2Schema(internalSchemaAfterAddColumns, 
typeChange);
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 0122576a679..9442195c608 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -258,8 +258,10 @@ object HoodieSparkSqlWriter {
               if (reconcileSchema) {
                 // In case we need to reconcile the schema and schema 
evolution is enabled,
                 // we will force-apply schema evolution to the writer's schema
-                if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
-                  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
+                if (schemaEvolutionEnabled) {
+                  // in case sourceSchema contains 
HoodieRecord.HOODIE_META_COLUMNS
+                  val allowOperationMetaDataField = 
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), 
"false").toBoolean
+                  
AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(latestTableSchema,
 allowOperationMetaDataField))
                 }
 
                 if (internalSchemaOpt.isDefined) {
@@ -314,7 +316,7 @@ object HoodieSparkSqlWriter {
             // Create a HoodieWriteClient & issue the write.
 
             val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, 
writerDataSchema.toString, path,
-              tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, 
internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
+              tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, 
internalSchemaOpt, Some(writerDataSchema)) - 
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
             if (isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
@@ -399,10 +401,27 @@ object HoodieSparkSqlWriter {
     processedRecord
   }
 
-  def addSchemaEvolutionParameters(parameters: Map[String, String], 
internalSchemaOpt: Option[InternalSchema]): Map[String, String] = {
+  def addSchemaEvolutionParameters(parameters: Map[String, String], 
internalSchemaOpt: Option[InternalSchema], writeSchemaOpt: Option[Schema] = 
None): Map[String, String] = {
     val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else 
"false"
-    parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> 
SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)),
-      HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> 
schemaEvolutionEnable)
+
+    val schemaValidateEnable = if (schemaEvolutionEnable.toBoolean && 
parameters.getOrDefault(DataSourceWriteOptions.RECONCILE_SCHEMA.key(), 
"false").toBoolean) {
+      // force disable schema validate, now we support schema evolution, no 
need to do validate
+      "false"
+    } else  {
+      
parameters.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), 
"true")
+    }
+    // correct internalSchema, internalSchema should contain hoodie metadata 
columns.
+    val correctInternalSchema = internalSchemaOpt.map { internalSchema =>
+      if (internalSchema.findField(HoodieRecord.RECORD_KEY_METADATA_FIELD) == 
null && writeSchemaOpt.isDefined) {
+        val allowOperationMetaDataField = 
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), 
"false").toBoolean
+        
AvroInternalSchemaConverter.convert(HoodieAvroUtils.addMetadataFields(writeSchemaOpt.get,
 allowOperationMetaDataField))
+      } else {
+        internalSchema
+      }
+    }
+    parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> 
SerDeHelper.toJson(correctInternalSchema.getOrElse(null)),
+      HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key() -> 
schemaEvolutionEnable,
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key()  -> 
schemaValidateEnable)
   }
 
   /**
@@ -411,7 +430,6 @@ object HoodieSparkSqlWriter {
     * @param fs           instance of FileSystem.
     * @param basePath     base path.
     * @param sparkContext instance of spark context.
-    * @param schema       incoming record's schema.
     * @return Pair of(boolean, table schema), where first entry will be true 
only if schema conversion is required.
     */
   def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, 
sparkContext: SparkContext): Option[InternalSchema] = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 9d955cb8310..ed9db3a5aa4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -18,13 +18,16 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.hadoop.fs.Path
+import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD_OPT_KEY, 
PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY, TABLE_NAME}
+import org.apache.hudi.QuickstartUtils.{DataGenerator, convertToStringList, 
getQuickstartWriteConfigs}
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.functions.{arrays_zip, col}
+import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit}
+import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 
 import scala.collection.JavaConversions._
@@ -511,7 +514,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
     }
   }
 
-  test("Test schema auto evolution") {
+  test("Test schema auto evolution complex") {
     withTempDir { tmp =>
       Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
         val tableName = generateTableName
@@ -534,7 +537,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
             DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
             DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
             "hoodie.schema.on.read.enable" -> "true",
-            "hoodie.datasource.write.reconcile.schema" -> "true",
             DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY -> "true"
           )
 
@@ -546,7 +548,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
             .save(tablePath)
 
           val oldView = spark.read.format("hudi").load(tablePath)
-          oldView.show(false)
+          oldView.show(5, false)
 
           val records2 = 
RawTripTestPayload.recordsToStrings(dataGen.generateUpdatesAsPerSchema("002", 
100, schema)).toList
           val inputD2 = 
spark.read.json(spark.sparkContext.parallelize(records2, 2))
@@ -571,4 +573,100 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test schema auto evolution") {
+    withTempDir { tmp =>
+      Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
+        // for complex schema.
+        val tableName = generateTableName
+        val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
+        if (HoodieSparkUtils.gteqSpark3_1) {
+          val dataGen = new DataGenerator
+          val inserts = convertToStringList(dataGen.generateInserts(10))
+          val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
+          df.write.format("hudi").
+            options(getQuickstartWriteConfigs).
+            option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
+            option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+            option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+            option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+            option("hoodie.schema.on.read.enable","true").
+            option(TABLE_NAME.key(), tableName).
+            option("hoodie.table.name", tableName).
+            mode("overwrite").
+            save(tablePath)
+
+          val updates = convertToStringList(dataGen.generateUpdates(10))
+          // type change: fare (double -> String)
+          // add new column and drop a column
+          val dfUpdate = 
spark.read.json(spark.sparkContext.parallelize(updates, 2))
+            .withColumn("fare", expr("cast(fare as string)"))
+            .withColumn("addColumn", lit("new"))
+          dfUpdate.drop("begin_lat").write.format("hudi").
+            options(getQuickstartWriteConfigs).
+            option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType).
+            option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+            option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+            option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+            option("hoodie.schema.on.read.enable","true").
+            option("hoodie.datasource.write.reconcile.schema","true").
+            option(TABLE_NAME.key(), tableName).
+            option("hoodie.table.name", tableName).
+            mode("append").
+            save(tablePath)
+          spark.sql("set hoodie.schema.on.read.enable=true")
+
+          val snapshotDF = spark.read.format("hudi").load(tablePath)
+
+          assertResult(StringType)(snapshotDF.schema.fields.filter(_.name == 
"fare").head.dataType)
+          assertResult("addColumn")(snapshotDF.schema.fields.last.name)
+          val checkRowKey = 
dfUpdate.select("fare").collectAsList().map(_.getString(0)).get(0)
+          snapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
+          checkAnswer(spark.sql(s"select fare, addColumn from  
hudi_trips_snapshot where fare = ${checkRowKey}").collect())(
+            Seq(checkRowKey, "new")
+          )
+
+          spark.sql(s"select * from  hudi_trips_snapshot").show(false)
+          //  test insert_over_write  + update again
+          val overwrite = convertToStringList(dataGen.generateInserts(10))
+          val dfOverWrite = spark.
+            read.json(spark.sparkContext.parallelize(overwrite, 2)).
+            filter("partitionpath = 'americas/united_states/san_francisco'")
+            .withColumn("fare", expr("cast(fare as string)")) // fare now in 
table is string type, we forbid convert string to double.
+          dfOverWrite.write.format("hudi").
+            options(getQuickstartWriteConfigs).
+            option("hoodie.datasource.write.operation","insert_overwrite").
+            option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+            option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+            option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+            option("hoodie.schema.on.read.enable","true").
+            option("hoodie.datasource.write.reconcile.schema","true").
+            option(TABLE_NAME.key(), tableName).
+            option("hoodie.table.name", tableName).
+            mode("append").
+            save(tablePath)
+          spark.read.format("hudi").load(tablePath).show(false)
+
+          val updatesAgain = convertToStringList(dataGen.generateUpdates(10))
+          val dfAgain = 
spark.read.json(spark.sparkContext.parallelize(updatesAgain, 
2)).withColumn("fare", expr("cast(fare as string)"))
+          dfAgain.write.format("hudi").
+            options(getQuickstartWriteConfigs).
+            option(PRECOMBINE_FIELD_OPT_KEY, "ts").
+            option(RECORDKEY_FIELD_OPT_KEY, "uuid").
+            option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
+            option("hoodie.schema.on.read.enable","true").
+            option("hoodie.datasource.write.reconcile.schema","true").
+            option(TABLE_NAME.key(), tableName).
+            option("hoodie.table.name", tableName).
+            mode("append").
+            save(tablePath)
+          
spark.read.format("hudi").load(tablePath).createOrReplaceTempView("hudi_trips_snapshot1")
+          val checkKey = 
dfAgain.select("fare").collectAsList().map(_.getString(0)).get(0)
+          checkAnswer(spark.sql(s"select fare, addColumn from  
hudi_trips_snapshot1 where fare = ${checkKey}").collect())(
+            Seq(checkKey, null)
+          )
+        }
+      }
+    }
+  }
 }

Reply via email to