hudi-agent commented on code in PR #18885:
URL: https://github.com/apache/hudi/pull/18885#discussion_r3336308019


##########
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java:
##########
@@ -94,12 +96,45 @@ public static HoodieCommitMetadata 
buildMetadata(List<HoodieWriteStat> writeStat
     if (extraMetadata.isPresent()) {
       extraMetadata.get().forEach(commitMetadata::addMetadata);
     }
-    commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
(schemaToStoreInCommit == null || schemaToStoreInCommit.equals(NULL_SCHEMA_STR))
-        ? "" : schemaToStoreInCommit);
+    commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
+        sanitizeSchemaForCommitMetadata(schemaToStoreInCommit));
     commitMetadata.setOperationType(operationType);
     return commitMetadata;
   }
 
+  /**
+   * Returns the value to persist under {@link 
HoodieCommitMetadata#SCHEMA_KEY}.
+   * The schema stored in commit extraMetadata must be the user/write schema 
and
+   * must NOT contain Hudi meta fields ({@code _hoodie_commit_time}, etc.). If
+   * the caller-provided schema has meta fields (e.g. because some upstream 
code
+   * mutated the in-memory write config schema with 
reader-schema-with-meta-fields,
+   * or because a previously-polluted SCHEMA_KEY was read back into the 
config),
+   * this strips them so the persisted schema is always clean. When no meta 
fields
+   * are present, the input string is returned unchanged.
+   */
+  public static String sanitizeSchemaForCommitMetadata(String 
schemaToStoreInCommit) {
+    if (StringUtils.isNullOrEmpty(schemaToStoreInCommit) || 
schemaToStoreInCommit.equals(NULL_SCHEMA_STR)) {
+      return "";
+    }

Review Comment:
   🤖 I noticed several other commit paths still write `SCHEMA_KEY` directly 
without going through `buildMetadata` (and therefore bypass the new 
sanitization): `RunCompactionActionExecutor.execute` (Spark/Java compaction, 
lines ~106/109), `CompactHelpers.createCompactionMetadata` (used by Flink 
compaction, line ~82), `TransactionUtils.resolveWriteConflictIfAny` (line 
~104), and `BaseHoodieWriteClient.saveInternalSchema` (line ~383). The PR 
description calls out compaction reader-schema setup and conflict resolution as 
upstream pollution sources, so it seems worth either routing those writes 
through `sanitizeSchemaForCommitMetadata` too (it's now public) or noting 
they're intentionally out of scope. @yihua wdyt — should this PR extend to 
those paths, or are they better as a follow-up?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java:
##########
@@ -129,6 +134,99 @@ public void testClusteringWithRow() throws IOException {
     writeAndClustering(true);
   }
 
+  /**
+   * Asserts that the schema persisted under HoodieCommitMetadata.SCHEMA_KEY 
in a completed
+   * replace (clustering) commit does NOT contain Hudi meta fields like 
_hoodie_commit_time.
+   * The schema stored in commit metadata is meant to be the user/write schema.
+   */
+  @Test
+  public void testReplaceCommitSchemaHasNoMetaFields() throws Exception {
+    setup(102400);
+    config.setValue("hoodie.datasource.write.row.writer.enable", "false");
+    config.setValue("hoodie.metadata.enable", "false");
+    
config.setValue("hoodie.clustering.plan.strategy.daybased.lookback.partitions", 
"1");
+    config.setValue("hoodie.clustering.plan.strategy.target.file.max.bytes", 
String.valueOf(1024 * 1024));
+    config.setValue("hoodie.clustering.plan.strategy.max.bytes.per.group", 
String.valueOf(2 * 1024 * 1024));
+
+    writeData(1000, true, System.currentTimeMillis());
+
+    String clusteringTime = (String) 
writeClient.scheduleClustering(Option.empty()).get();
+    writeClient.cluster(clusteringTime, true);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieInstant replaceInstant = metaClient.getActiveTimeline()
+        .getCompletedReplaceTimeline()
+        .filter(i -> i.requestedTime().equals(clusteringTime))
+        .firstInstant()
+        .orElseThrow(() -> new AssertionError("No completed replace commit 
found for " + clusteringTime));
+
+    HoodieReplaceCommitMetadata replaceCommitMetadata =
+        
metaClient.getActiveTimeline().readReplaceCommitMetadata(replaceInstant);
+    assertSchemaHasNoMetaFields(replaceCommitMetadata, "replace (clustering) 
commit");
+  }
+
+  /**
+   * Even when {@code config.getSchema()} is pre-polluted with Hudi meta fields
+   * (simulating upstream paths like compaction reader-schema setup that may 
set
+   * a schema-with-meta-fields back onto the write config), both ingestion and
+   * clustering commits must persist a clean schema (without meta fields) under
+   * {@link HoodieCommitMetadata#SCHEMA_KEY}. This guards the sanitization in
+   * {@code CommitUtils#sanitizeSchemaForCommitMetadata(String)}.
+   */
+  @Test
+  public void testCommitSchemaCleanedEvenWhenConfigSchemaHasMetaFields() 
throws Exception {
+    setup(102400);
+    config.setValue("hoodie.datasource.write.row.writer.enable", "false");
+    config.setValue("hoodie.metadata.enable", "false");

Review Comment:
   🤖 nit: these five `config.setValue(...)` lines are identical to lines 
146-150 in `testReplaceCommitSchemaHasNoMetaFields` — have you considered 
pulling them into a `setupClusteringConfig()` helper so a future change to a 
key or value only needs to happen in one place?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CommitUtils.java:
##########
@@ -94,12 +96,45 @@ public static HoodieCommitMetadata 
buildMetadata(List<HoodieWriteStat> writeStat
     if (extraMetadata.isPresent()) {
       extraMetadata.get().forEach(commitMetadata::addMetadata);
     }
-    commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
(schemaToStoreInCommit == null || schemaToStoreInCommit.equals(NULL_SCHEMA_STR))
-        ? "" : schemaToStoreInCommit);
+    commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
+        sanitizeSchemaForCommitMetadata(schemaToStoreInCommit));
     commitMetadata.setOperationType(operationType);
     return commitMetadata;
   }
 
+  /**
+   * Returns the value to persist under {@link 
HoodieCommitMetadata#SCHEMA_KEY}.
+   * The schema stored in commit extraMetadata must be the user/write schema 
and
+   * must NOT contain Hudi meta fields ({@code _hoodie_commit_time}, etc.). If
+   * the caller-provided schema has meta fields (e.g. because some upstream 
code
+   * mutated the in-memory write config schema with 
reader-schema-with-meta-fields,
+   * or because a previously-polluted SCHEMA_KEY was read back into the 
config),
+   * this strips them so the persisted schema is always clean. When no meta 
fields
+   * are present, the input string is returned unchanged.
+   */
+  public static String sanitizeSchemaForCommitMetadata(String 
schemaToStoreInCommit) {
+    if (StringUtils.isNullOrEmpty(schemaToStoreInCommit) || 
schemaToStoreInCommit.equals(NULL_SCHEMA_STR)) {
+      return "";
+    }
+    if (!containsHudiMetaField(schemaToStoreInCommit)) {
+      return schemaToStoreInCommit;
+    }
+    HoodieSchema schema = HoodieSchema.parse(schemaToStoreInCommit);
+    if (schema.isSchemaNull()) {
+      return "";
+    }
+    return HoodieSchemaUtils.removeMetadataFields(schema).toString();
+  }
+
+  private static boolean containsHudiMetaField(String schemaStr) {
+    for (String metaField : HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION) {

Review Comment:
   🤖 nit: could you simplify this to a stream one-liner? `return 
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.stream().anyMatch(schemaStr::contains);`
 — matches the pattern used elsewhere in the codebase and avoids the manual 
loop.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to