hudi-agent commented on code in PR #18692:
URL: https://github.com/apache/hudi/pull/18692#discussion_r3192816838


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -106,7 +106,10 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
performClustering(final Hood
         Math.min(clusteringPlan.getInputGroups().size(), 
writeConfig.getClusteringMaxParallelism()),
         new CustomizedThreadFactory("clustering-job-group", true));
     try {
-      boolean canUseRowWriter = 
getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
 true);
+      // shouldForceRowWriter() opts in for subclasses that require the Row 
path (e.g. when

Review Comment:
   🤖 The Javadoc for `shouldForceRowWriter()` (and the PR description) says 
"the standard Row-writer compatibility check still gates whether the path is 
actually taken." I don't see such a compatibility check in this code path — the 
only gate today is `hoodie.datasource.write.row.writer.enable`, which 
`shouldForceRowWriter()` is now OR-ing past. Could you clarify where that 
compatibility check lives, or update the doc to match? Otherwise the comment 
misleads readers into thinking there is a fallback safety net that doesn't 
exist.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java:
##########
@@ -38,22 +40,129 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Clustering Strategy based on following.
  * 1) Spark execution engine.
  * 2) Uses bulk_insert to write data into new files.
+ *
+ * <p>When a {@link ClusteringGroupWriter} provider is registered on the 
runtime classpath
+ * and reports {@link ClusteringGroupWriter#isEnabled()}, this strategy 
delegates each input
+ * group to the writer. The Row path is forced (subject to the standard 
Row-writer
+ * compatibility check) so the writer always sees the Dataset-based pipeline. 
Per-group
+ * fallback to the default Spark path happens whenever the writer returns
+ * {@link Option#empty()}.
  */
 @Slf4j
 public class SparkSortAndSizeExecutionStrategy<T>
     extends MultipleSparkJobExecutionStrategy<T> {
 
+  /** Lazily initialized; write config schema is constant for a given strategy 
instance. */
+  private final AtomicReference<HoodieSchema> cachedSchema = new 
AtomicReference<>();
+
   public SparkSortAndSizeExecutionStrategy(HoodieTable table,
                                            HoodieEngineContext engineContext,
                                            HoodieWriteConfig writeConfig) {
     super(table, engineContext, writeConfig);
   }
 
+  @Override
+  protected boolean shouldForceRowWriter() {
+    Option<ClusteringGroupWriter> writer = ClusteringGroupWriterRegistry.get();
+    return writer.isPresent() && writer.get().isEnabled();
+  }
+
+  @Override
+  protected CompletableFuture<HoodieData<WriteStatus>> 
runClusteringForGroupAsyncAsRow(
+      HoodieClusteringGroup clusteringGroup,
+      Map<String, String> strategyParams,
+      boolean shouldPreserveHoodieMetadata,
+      String instantTime,
+      ExecutorService clusteringExecutorService) {
+    Option<CompletableFuture<HoodieData<WriteStatus>>> delegated = 
tryDelegateToGroupWriter(
+        clusteringGroup, strategyParams, shouldPreserveHoodieMetadata, 
instantTime, clusteringExecutorService);
+    if (delegated.isPresent()) {

Review Comment:
   🤖 Heads up: if the SPI's `runClusteringForGroupAsync` throws synchronously 
(e.g. NPE, IllegalState) before returning the future, the exception propagates 
up through `clusteringPlan.getInputGroups().stream().map(...)` in 
`performClustering`, while the default path always wraps everything inside 
`CompletableFuture.supplyAsync` so failures are async. Worth either documenting 
the contract on the SPI ("failures must be reported via a failed future, not by 
throwing") or wrapping the call in `CompletableFuture.supplyAsync` here so 
semantics match the default.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java:
##########
@@ -38,22 +40,129 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Clustering Strategy based on following.
  * 1) Spark execution engine.
  * 2) Uses bulk_insert to write data into new files.
+ *
+ * <p>When a {@link ClusteringGroupWriter} provider is registered on the 
runtime classpath
+ * and reports {@link ClusteringGroupWriter#isEnabled()}, this strategy 
delegates each input
+ * group to the writer. The Row path is forced (subject to the standard 
Row-writer
+ * compatibility check) so the writer always sees the Dataset-based pipeline. 
Per-group
+ * fallback to the default Spark path happens whenever the writer returns
+ * {@link Option#empty()}.
  */
 @Slf4j
 public class SparkSortAndSizeExecutionStrategy<T>
     extends MultipleSparkJobExecutionStrategy<T> {
 
+  /** Lazily initialized; write config schema is constant for a given strategy 
instance. */
+  private final AtomicReference<HoodieSchema> cachedSchema = new 
AtomicReference<>();
+
   public SparkSortAndSizeExecutionStrategy(HoodieTable table,
                                            HoodieEngineContext engineContext,
                                            HoodieWriteConfig writeConfig) {
     super(table, engineContext, writeConfig);
   }
 
+  @Override
+  protected boolean shouldForceRowWriter() {
+    Option<ClusteringGroupWriter> writer = ClusteringGroupWriterRegistry.get();
+    return writer.isPresent() && writer.get().isEnabled();
+  }
+
+  @Override
+  protected CompletableFuture<HoodieData<WriteStatus>> 
runClusteringForGroupAsyncAsRow(
+      HoodieClusteringGroup clusteringGroup,
+      Map<String, String> strategyParams,
+      boolean shouldPreserveHoodieMetadata,
+      String instantTime,
+      ExecutorService clusteringExecutorService) {
+    Option<CompletableFuture<HoodieData<WriteStatus>>> delegated = 
tryDelegateToGroupWriter(
+        clusteringGroup, strategyParams, shouldPreserveHoodieMetadata, 
instantTime, clusteringExecutorService);
+    if (delegated.isPresent()) {
+      return delegated.get();
+    }
+    return runSuperRunClusteringForGroupAsyncAsRow(
+        clusteringGroup, strategyParams, shouldPreserveHoodieMetadata, 
instantTime, clusteringExecutorService);
+  }
+
+  /**
+   * Indirection over {@code super.runClusteringForGroupAsyncAsRow} so unit 
tests can
+   * verify the fallback contract without bootstrapping a real {@link 
HoodieTable}. Tests
+   * override only this method; the production {@code 
runClusteringForGroupAsyncAsRow}
+   * routing logic is exercised unchanged.

Review Comment:
   🤖 nit: `runSuperRunClusteringForGroupAsyncAsRow` is a bit of a mouthful — 
the `runSuper` prefix reads oddly since `super` is a language keyword. 
Something like `executeDefaultRowGroupClustering` or `callSuperRowClustering` 
might make the test-indirection intent clearer without the keyword collision.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ClusteringGroupWriteContext.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.run.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Immutable parameter bundle passed to a {@link ClusteringGroupWriter} for a 
single
+ * clustering group. Wraps the inputs the SPI needs (clustering group, 
strategy params,
+ * instant time, executor, schema, table, write config) so the SPI signature 
can grow
+ * without breaking implementers.
+ */
+public final class ClusteringGroupWriteContext {
+
+  private final HoodieClusteringGroup clusteringGroup;
+  private final Map<String, String> strategyParams;
+  private final boolean shouldPreserveHoodieMetadata;
+  private final String instantTime;
+  private final ExecutorService clusteringExecutorService;
+  private final HoodieSchema schema;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+
+  private ClusteringGroupWriteContext(Builder b) {
+    this.clusteringGroup = Objects.requireNonNull(b.clusteringGroup, 
"clusteringGroup");
+    this.strategyParams = b.strategyParams != null
+        ? Collections.unmodifiableMap(b.strategyParams)
+        : Collections.emptyMap();
+    this.shouldPreserveHoodieMetadata = b.shouldPreserveHoodieMetadata;
+    this.instantTime = Objects.requireNonNull(b.instantTime, "instantTime");
+    this.clusteringExecutorService =
+        Objects.requireNonNull(b.clusteringExecutorService, 
"clusteringExecutorService");
+    this.schema = Objects.requireNonNull(b.schema, "schema");
+    this.table = Objects.requireNonNull(b.table, "table");
+    this.writeConfig = Objects.requireNonNull(b.writeConfig, "writeConfig");
+  }
+
+  public HoodieClusteringGroup getClusteringGroup() {
+    return clusteringGroup;
+  }
+
+  public Map<String, String> getStrategyParams() {
+    return strategyParams;
+  }
+
+  public boolean shouldPreserveHoodieMetadata() {
+    return shouldPreserveHoodieMetadata;
+  }
+
+  public String getInstantTime() {
+    return instantTime;
+  }
+
+  public ExecutorService getClusteringExecutorService() {
+    return clusteringExecutorService;
+  }
+
+  public HoodieSchema getSchema() {
+    return schema;
+  }
+
+  public HoodieTable getTable() {
+    return table;
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return writeConfig;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static final class Builder {
+    private HoodieClusteringGroup clusteringGroup;
+    private Map<String, String> strategyParams;
+    private boolean shouldPreserveHoodieMetadata;
+    private String instantTime;
+    private ExecutorService clusteringExecutorService;
+    private HoodieSchema schema;
+    private HoodieTable table;
+    private HoodieWriteConfig writeConfig;
+
+    public Builder clusteringGroup(HoodieClusteringGroup v) {

Review Comment:
   🤖 nit: could you use the field name as the parameter name in all the builder 
setters (e.g. `clusteringGroup(HoodieClusteringGroup clusteringGroup)`) rather 
than `v`? The rest of the Hudi codebase uses descriptive parameter names in 
builders, and `v` loses the name at the call site when reading a stack trace or 
hovering in an IDE.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java:
##########
@@ -38,22 +40,129 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Clustering Strategy based on following.
  * 1) Spark execution engine.
  * 2) Uses bulk_insert to write data into new files.
+ *
+ * <p>When a {@link ClusteringGroupWriter} provider is registered on the 
runtime classpath
+ * and reports {@link ClusteringGroupWriter#isEnabled()}, this strategy 
delegates each input
+ * group to the writer. The Row path is forced (subject to the standard 
Row-writer
+ * compatibility check) so the writer always sees the Dataset-based pipeline. 
Per-group
+ * fallback to the default Spark path happens whenever the writer returns
+ * {@link Option#empty()}.
  */
 @Slf4j
 public class SparkSortAndSizeExecutionStrategy<T>
     extends MultipleSparkJobExecutionStrategy<T> {
 
+  /** Lazily initialized; write config schema is constant for a given strategy 
instance. */
+  private final AtomicReference<HoodieSchema> cachedSchema = new 
AtomicReference<>();
+
   public SparkSortAndSizeExecutionStrategy(HoodieTable table,
                                            HoodieEngineContext engineContext,
                                            HoodieWriteConfig writeConfig) {
     super(table, engineContext, writeConfig);
   }
 
+  @Override
+  protected boolean shouldForceRowWriter() {
+    Option<ClusteringGroupWriter> writer = ClusteringGroupWriterRegistry.get();
+    return writer.isPresent() && writer.get().isEnabled();
+  }
+
+  @Override
+  protected CompletableFuture<HoodieData<WriteStatus>> 
runClusteringForGroupAsyncAsRow(
+      HoodieClusteringGroup clusteringGroup,
+      Map<String, String> strategyParams,
+      boolean shouldPreserveHoodieMetadata,
+      String instantTime,
+      ExecutorService clusteringExecutorService) {
+    Option<CompletableFuture<HoodieData<WriteStatus>>> delegated = 
tryDelegateToGroupWriter(
+        clusteringGroup, strategyParams, shouldPreserveHoodieMetadata, 
instantTime, clusteringExecutorService);
+    if (delegated.isPresent()) {
+      return delegated.get();
+    }
+    return runSuperRunClusteringForGroupAsyncAsRow(
+        clusteringGroup, strategyParams, shouldPreserveHoodieMetadata, 
instantTime, clusteringExecutorService);
+  }
+
+  /**
+   * Indirection over {@code super.runClusteringForGroupAsyncAsRow} so unit 
tests can
+   * verify the fallback contract without bootstrapping a real {@link 
HoodieTable}. Tests
+   * override only this method; the production {@code 
runClusteringForGroupAsyncAsRow}
+   * routing logic is exercised unchanged.
+   */
+  CompletableFuture<HoodieData<WriteStatus>> 
runSuperRunClusteringForGroupAsyncAsRow(
+      HoodieClusteringGroup clusteringGroup,
+      Map<String, String> strategyParams,
+      boolean shouldPreserveHoodieMetadata,
+      String instantTime,
+      ExecutorService clusteringExecutorService) {
+    return super.runClusteringForGroupAsyncAsRow(
+        clusteringGroup, strategyParams, shouldPreserveHoodieMetadata, 
instantTime, clusteringExecutorService);
+  }
+
+  /**
+   * Routing for the {@link ClusteringGroupWriter} SPI. Returns the writer's 
future when a
+   * writer is registered, reports enabled, AND can serve the group. Returns
+   * {@link Option#empty()} otherwise so the caller falls back to the default 
path.
+   *
+   * <p>Package-private so tests can exercise the routing logic directly 
without needing a
+   * real {@link HoodieTable} to drive {@code 
super.runClusteringForGroupAsyncAsRow}.
+   */
+  Option<CompletableFuture<HoodieData<WriteStatus>>> tryDelegateToGroupWriter(
+      HoodieClusteringGroup clusteringGroup,
+      Map<String, String> strategyParams,
+      boolean shouldPreserveHoodieMetadata,
+      String instantTime,
+      ExecutorService clusteringExecutorService) {
+    Option<ClusteringGroupWriter> writerOpt = 
ClusteringGroupWriterRegistry.get();
+    if (!writerOpt.isPresent() || !writerOpt.get().isEnabled()) {
+      return Option.empty();
+    }
+    ClusteringGroupWriter writer = writerOpt.get();
+    log.info("Delegating clustering group (firstFileId={}, instant={}) to 
ClusteringGroupWriter '{}'",
+        firstFileGroupId(clusteringGroup), instantTime, writer.name());
+    ClusteringGroupWriteContext context = ClusteringGroupWriteContext.builder()
+        .clusteringGroup(clusteringGroup)
+        .strategyParams(strategyParams)
+        .shouldPreserveHoodieMetadata(shouldPreserveHoodieMetadata)
+        .instantTime(instantTime)
+        .clusteringExecutorService(clusteringExecutorService)
+        .schema(getCachedSchema())
+        .table(getHoodieTable())
+        .writeConfig(getWriteConfig())
+        .build();
+    Option<CompletableFuture<HoodieData<WriteStatus>>> result = 
writer.runClusteringForGroupAsync(context);

Review Comment:
   🤖 The schema handed to the SPI here differs from what the default Row path 
resolves. `super.runClusteringForGroupAsyncAsRow` uses `addMetadataFields(new 
TableSchemaResolver(metaClient).getTableSchema(false), 
allowOperationMetadataField)` (and the existing comment notes "incase of MIT, 
config.getSchema may not contain the full table schema"), whereas 
`getCachedSchema()` is just `HoodieSchema.parse(getWriteConfig().getSchema())` 
— no metadata fields and no `TableSchemaResolver`. Could you clarify what the 
SPI is expected to receive? If implementers assume parity with the default 
path, they could silently miss `_hoodie_*` columns or get a stale schema in MIT 
scenarios.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to