nsivabalan commented on code in PR #12646:
URL: https://github.com/apache/hudi/pull/12646#discussion_r1929119643


##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java:
##########
@@ -377,4 +381,20 @@ private static HoodieDataBlock 
getDataBlock(HoodieLogBlock.HoodieLogBlockType da
         throw new RuntimeException("Unknown data block type " + dataBlockType);
     }
   }
+
+  protected byte[] getCommitMetadata(String basePath, String partition, String 
commitTs, int count, Map<String, String> extraMetadata)
+      throws IOException {
+    HoodieCommitMetadata commit = new HoodieCommitMetadata();

Review Comment:
   if we want some commit metadata to test schema evol cases, we can define a 
static HoodieCommitMetadata. only override the extrametadata and serialize on a 
need basis. a minor optimization 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala:
##########
@@ -1291,6 +1294,62 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
     })
   }
 
+  test("Test partial insert with inline clustering") {
+    withRecordType()(withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      val tableType = "mor"
+      val logDataBlockFormat = "parquet"
+      withSparkSqlSessionConfig(
+          HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key -> "0",
+          DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key -> 
"true",
+          HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> 
logDataBlockFormat,
+          HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> "true",
+          HoodieClusteringConfig.INLINE_CLUSTERING.key -> "true",
+          HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key -> "2",
+          HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key -> "id,price") 
{
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |   id int,
+             |   name string,
+             |   price long,
+             |   ts long,
+             |   description string
+             | ) using hudi
+             | tblproperties(
+             |   type ='$tableType',
+             |   primaryKey = 'id',
+             |   preCombineField = 'ts'
+             | )
+             | location '$basePath'
+            """.stripMargin)
+        spark.sql(s"insert into $tableName values " +
+          "(1, 'a1', 10, 1000, 'a1: desc1')," +
+          "(2, 'a2', 20, 1200, 'a2: desc2'), " +
+          "(3, 'a3', 30.0, 1250, 'a3: desc3')")
+
+        // Partial updates using MERGE INTO statement with changed fields: 
"price" and "ts"
+        spark.sql(
+          s"""
+             | merge into $tableName t0
+             | using (
+             |   select 1 as id, 'a1' as name1, 12 as price, 1001 as _ts
+             | union
+             |   select 3 as id, 'a3' as name1, 25 as price, 1260 as _ts
+             |   ) s0
+             | on t0.id = s0.id
+             | when matched then update set price = s0.price, ts = s0._ts
+          """.stripMargin)
+        checkAnswer(s"select id, name, price, ts, description from $tableName 
order by id")(
+          Seq(1, "a1", 12, 1001, "a1: desc1"),
+          Seq(2, "a2", 20, 1200, "a2: desc2"),
+          Seq(3, "a3", 25, 1260, "a3: desc3")

Review Comment:
   can we validate from the timeline that clustering actually completed. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -176,32 +185,43 @@ public Option<Schema> getTableAvroSchemaIfPresent(boolean 
includeMetadataFields)
     return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
   }
 
-  private Option<Schema> getTableAvroSchemaInternal(boolean 
includeMetadataFields, Option<HoodieInstant> instantOpt) {
-    Option<Schema> schema =
-        (instantOpt.isPresent()
-            ? getTableSchemaFromCommitMetadata(instantOpt.get(), 
includeMetadataFields)
-            : getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
-            .or(() ->
-                metaClient.getTableConfig().getTableCreateSchema()
-                    .map(tableSchema ->
-                        includeMetadataFields
-                            ? HoodieAvroUtils.addMetadataFields(tableSchema, 
hasOperationField.get())
-                            : tableSchema)
-            )
-            .or(() -> {
-              Option<Schema> schemaFromDataFile = 
getTableAvroSchemaFromDataFileInternal();
-              return includeMetadataFields
-                  ? schemaFromDataFile
-                  : 
schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields);
-            });
-
-    // TODO partition columns have to be appended in all read-paths
-    if (metaClient.getTableConfig().shouldDropPartitionColumns() && 
schema.isPresent()) {
+  Option<Schema> getTableAvroSchemaInternal(boolean includeMetadataFields, 
Option<HoodieInstant> instantOpt) {
+    return (instantOpt.isPresent()
+        ? getTableSchemaFromCommitMetadata(instantOpt.get(), 
includeMetadataFields)

Review Comment:
   can you confirm that we are not changing anything here? we just created new 
private methods so that we re-use for the new V2 apis impl



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -222,6 +242,50 @@ private Option<Schema> 
getTableSchemaFromLatestCommitMetadata(boolean includeMet
     }
   }
 
+  public Option<Schema> getTableAvroSchemaIfPresentV2(boolean 
includeMetadataFields) {
+    return getTableSchemaFromLatestCommitMetadataV2(includeMetadataFields)
+      .or(() -> getTableCreateSchemaWithMetadata(includeMetadataFields))
+      .or(() -> getSchemaFromDataFileIfPresent(includeMetadataFields))
+        .map(this::handlePartitionColumnsIfNeeded);
+  }
+
+  private Option<Schema> getSchemaFromDataFileIfPresent(boolean 
includeMetadataFields) {
+    Option<Schema> schemaFromDataFile = 
getTableAvroSchemaFromDataFileInternal();
+    return (includeMetadataFields
+        ? schemaFromDataFile
+        : schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields));
+  }
+
+  private Option<Schema> getTableSchemaFromLatestCommitMetadataV2(boolean 
includeMetadataFields) {

Review Comment:
   similarly, lets rename these methods.
   
   also, can we file a follow up jira for the clean up and add it in java docs 
here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -222,6 +244,50 @@ private Option<Schema> 
getTableSchemaFromLatestCommitMetadata(boolean includeMet
     }
   }
 
+  public Option<Schema> getTableAvroSchemaIfPresentV2(boolean 
includeMetadataFields) {

Review Comment:
   makes sense. gotcha. can we name this specific to clustering so that no one 
else unintentionally uses it once this patch lands before we clean it up 
properly.  
   
   can we name this getTableAvroSchemaForClustering



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to