hudi-agent commented on code in PR #18692:
URL: https://github.com/apache/hudi/pull/18692#discussion_r3192816838
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java:
##########
@@ -106,7 +106,10 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>>
performClustering(final Hood
Math.min(clusteringPlan.getInputGroups().size(),
writeConfig.getClusteringMaxParallelism()),
new CustomizedThreadFactory("clustering-job-group", true));
try {
- boolean canUseRowWriter =
getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable",
true);
+ // shouldForceRowWriter() opts in for subclasses that require the Row
path (e.g. when
Review Comment:
🤖 The Javadoc for `shouldForceRowWriter()` (and the PR description) says
"the standard Row-writer compatibility check still gates whether the path is
actually taken." I don't see such a compatibility check in this code path — the
only gate today is `hoodie.datasource.write.row.writer.enable`, which
`shouldForceRowWriter()` is now OR-ing past. Could you clarify where that
compatibility check lives, or update the doc to match? Otherwise the comment
misleads readers into thinking there is a fallback safety net that doesn't
exist.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java:
##########
@@ -38,22 +40,129 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Clustering Strategy based on following.
* 1) Spark execution engine.
* 2) Uses bulk_insert to write data into new files.
+ *
+ * <p>When a {@link ClusteringGroupWriter} provider is registered on the
runtime classpath
+ * and reports {@link ClusteringGroupWriter#isEnabled()}, this strategy
delegates each input
+ * group to the writer. The Row path is forced (subject to the standard
Row-writer
+ * compatibility check) so the writer always sees the Dataset-based pipeline.
Per-group
+ * fallback to the default Spark path happens whenever the writer returns
+ * {@link Option#empty()}.
*/
@Slf4j
public class SparkSortAndSizeExecutionStrategy<T>
extends MultipleSparkJobExecutionStrategy<T> {
+ /** Lazily initialized; write config schema is constant for a given strategy
instance. */
+ private final AtomicReference<HoodieSchema> cachedSchema = new
AtomicReference<>();
+
public SparkSortAndSizeExecutionStrategy(HoodieTable table,
HoodieEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
+ @Override
+ protected boolean shouldForceRowWriter() {
+ Option<ClusteringGroupWriter> writer = ClusteringGroupWriterRegistry.get();
+ return writer.isPresent() && writer.get().isEnabled();
+ }
+
+ @Override
+ protected CompletableFuture<HoodieData<WriteStatus>>
runClusteringForGroupAsyncAsRow(
+ HoodieClusteringGroup clusteringGroup,
+ Map<String, String> strategyParams,
+ boolean shouldPreserveHoodieMetadata,
+ String instantTime,
+ ExecutorService clusteringExecutorService) {
+ Option<CompletableFuture<HoodieData<WriteStatus>>> delegated =
tryDelegateToGroupWriter(
+ clusteringGroup, strategyParams, shouldPreserveHoodieMetadata,
instantTime, clusteringExecutorService);
+ if (delegated.isPresent()) {
Review Comment:
🤖 Heads up: if the SPI's `runClusteringForGroupAsync` throws synchronously
(e.g. NPE, IllegalState) before returning the future, the exception propagates
up through `clusteringPlan.getInputGroups().stream().map(...)` in
`performClustering`, while the default path always wraps everything inside
`CompletableFuture.supplyAsync` so failures are async. Worth either documenting
the contract on the SPI ("failures must be reported via a failed future, not by
throwing") or wrapping the call in `CompletableFuture.supplyAsync` here so
semantics match the default.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java:
##########
@@ -38,22 +40,129 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Clustering Strategy based on following.
* 1) Spark execution engine.
* 2) Uses bulk_insert to write data into new files.
+ *
+ * <p>When a {@link ClusteringGroupWriter} provider is registered on the
runtime classpath
+ * and reports {@link ClusteringGroupWriter#isEnabled()}, this strategy
delegates each input
+ * group to the writer. The Row path is forced (subject to the standard
Row-writer
+ * compatibility check) so the writer always sees the Dataset-based pipeline.
Per-group
+ * fallback to the default Spark path happens whenever the writer returns
+ * {@link Option#empty()}.
*/
@Slf4j
public class SparkSortAndSizeExecutionStrategy<T>
extends MultipleSparkJobExecutionStrategy<T> {
+ /** Lazily initialized; write config schema is constant for a given strategy
instance. */
+ private final AtomicReference<HoodieSchema> cachedSchema = new
AtomicReference<>();
+
public SparkSortAndSizeExecutionStrategy(HoodieTable table,
HoodieEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
+ @Override
+ protected boolean shouldForceRowWriter() {
+ Option<ClusteringGroupWriter> writer = ClusteringGroupWriterRegistry.get();
+ return writer.isPresent() && writer.get().isEnabled();
+ }
+
+ @Override
+ protected CompletableFuture<HoodieData<WriteStatus>>
runClusteringForGroupAsyncAsRow(
+ HoodieClusteringGroup clusteringGroup,
+ Map<String, String> strategyParams,
+ boolean shouldPreserveHoodieMetadata,
+ String instantTime,
+ ExecutorService clusteringExecutorService) {
+ Option<CompletableFuture<HoodieData<WriteStatus>>> delegated =
tryDelegateToGroupWriter(
+ clusteringGroup, strategyParams, shouldPreserveHoodieMetadata,
instantTime, clusteringExecutorService);
+ if (delegated.isPresent()) {
+ return delegated.get();
+ }
+ return runSuperRunClusteringForGroupAsyncAsRow(
+ clusteringGroup, strategyParams, shouldPreserveHoodieMetadata,
instantTime, clusteringExecutorService);
+ }
+
+ /**
+ * Indirection over {@code super.runClusteringForGroupAsyncAsRow} so unit
tests can
+ * verify the fallback contract without bootstrapping a real {@link
HoodieTable}. Tests
+ * override only this method; the production {@code
runClusteringForGroupAsyncAsRow}
+ * routing logic is exercised unchanged.
Review Comment:
🤖 nit: `runSuperRunClusteringForGroupAsyncAsRow` is a bit of a mouthful —
the `runSuper` prefix reads oddly since `super` is a language keyword.
Something like `executeDefaultRowGroupClustering` or `callSuperRowClustering`
might make the test-indirection intent clearer without the keyword collision.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/ClusteringGroupWriteContext.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.run.strategy;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Immutable parameter bundle passed to a {@link ClusteringGroupWriter} for a
single
+ * clustering group. Wraps the inputs the SPI needs (clustering group,
strategy params,
+ * instant time, executor, schema, table, write config) so the SPI signature
can grow
+ * without breaking implementers.
+ */
+public final class ClusteringGroupWriteContext {
+
+ private final HoodieClusteringGroup clusteringGroup;
+ private final Map<String, String> strategyParams;
+ private final boolean shouldPreserveHoodieMetadata;
+ private final String instantTime;
+ private final ExecutorService clusteringExecutorService;
+ private final HoodieSchema schema;
+ private final HoodieTable table;
+ private final HoodieWriteConfig writeConfig;
+
+ private ClusteringGroupWriteContext(Builder b) {
+ this.clusteringGroup = Objects.requireNonNull(b.clusteringGroup,
"clusteringGroup");
+ this.strategyParams = b.strategyParams != null
+ ? Collections.unmodifiableMap(b.strategyParams)
+ : Collections.emptyMap();
+ this.shouldPreserveHoodieMetadata = b.shouldPreserveHoodieMetadata;
+ this.instantTime = Objects.requireNonNull(b.instantTime, "instantTime");
+ this.clusteringExecutorService =
+ Objects.requireNonNull(b.clusteringExecutorService,
"clusteringExecutorService");
+ this.schema = Objects.requireNonNull(b.schema, "schema");
+ this.table = Objects.requireNonNull(b.table, "table");
+ this.writeConfig = Objects.requireNonNull(b.writeConfig, "writeConfig");
+ }
+
+ public HoodieClusteringGroup getClusteringGroup() {
+ return clusteringGroup;
+ }
+
+ public Map<String, String> getStrategyParams() {
+ return strategyParams;
+ }
+
+ public boolean shouldPreserveHoodieMetadata() {
+ return shouldPreserveHoodieMetadata;
+ }
+
+ public String getInstantTime() {
+ return instantTime;
+ }
+
+ public ExecutorService getClusteringExecutorService() {
+ return clusteringExecutorService;
+ }
+
+ public HoodieSchema getSchema() {
+ return schema;
+ }
+
+ public HoodieTable getTable() {
+ return table;
+ }
+
+ public HoodieWriteConfig getWriteConfig() {
+ return writeConfig;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private HoodieClusteringGroup clusteringGroup;
+ private Map<String, String> strategyParams;
+ private boolean shouldPreserveHoodieMetadata;
+ private String instantTime;
+ private ExecutorService clusteringExecutorService;
+ private HoodieSchema schema;
+ private HoodieTable table;
+ private HoodieWriteConfig writeConfig;
+
+ public Builder clusteringGroup(HoodieClusteringGroup v) {
Review Comment:
🤖 nit: could you use the field name as the parameter name in all the builder
setters (e.g. `clusteringGroup(HoodieClusteringGroup clusteringGroup)`) rather
than `v`? The rest of the Hudi codebase uses descriptive parameter names in
builders, and `v` loses the name at the call site when reading a stack trace or
hovering in an IDE.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java:
##########
@@ -38,22 +40,129 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Clustering Strategy based on following.
* 1) Spark execution engine.
* 2) Uses bulk_insert to write data into new files.
+ *
+ * <p>When a {@link ClusteringGroupWriter} provider is registered on the
runtime classpath
+ * and reports {@link ClusteringGroupWriter#isEnabled()}, this strategy
delegates each input
+ * group to the writer. The Row path is forced (subject to the standard
Row-writer
+ * compatibility check) so the writer always sees the Dataset-based pipeline.
Per-group
+ * fallback to the default Spark path happens whenever the writer returns
+ * {@link Option#empty()}.
*/
@Slf4j
public class SparkSortAndSizeExecutionStrategy<T>
extends MultipleSparkJobExecutionStrategy<T> {
+ /** Lazily initialized; write config schema is constant for a given strategy
instance. */
+ private final AtomicReference<HoodieSchema> cachedSchema = new
AtomicReference<>();
+
public SparkSortAndSizeExecutionStrategy(HoodieTable table,
HoodieEngineContext engineContext,
HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
}
+ @Override
+ protected boolean shouldForceRowWriter() {
+ Option<ClusteringGroupWriter> writer = ClusteringGroupWriterRegistry.get();
+ return writer.isPresent() && writer.get().isEnabled();
+ }
+
+ @Override
+ protected CompletableFuture<HoodieData<WriteStatus>>
runClusteringForGroupAsyncAsRow(
+ HoodieClusteringGroup clusteringGroup,
+ Map<String, String> strategyParams,
+ boolean shouldPreserveHoodieMetadata,
+ String instantTime,
+ ExecutorService clusteringExecutorService) {
+ Option<CompletableFuture<HoodieData<WriteStatus>>> delegated =
tryDelegateToGroupWriter(
+ clusteringGroup, strategyParams, shouldPreserveHoodieMetadata,
instantTime, clusteringExecutorService);
+ if (delegated.isPresent()) {
+ return delegated.get();
+ }
+ return runSuperRunClusteringForGroupAsyncAsRow(
+ clusteringGroup, strategyParams, shouldPreserveHoodieMetadata,
instantTime, clusteringExecutorService);
+ }
+
+ /**
+ * Indirection over {@code super.runClusteringForGroupAsyncAsRow} so unit
tests can
+ * verify the fallback contract without bootstrapping a real {@link
HoodieTable}. Tests
+ * override only this method; the production {@code
runClusteringForGroupAsyncAsRow}
+ * routing logic is exercised unchanged.
+ */
+ CompletableFuture<HoodieData<WriteStatus>>
runSuperRunClusteringForGroupAsyncAsRow(
+ HoodieClusteringGroup clusteringGroup,
+ Map<String, String> strategyParams,
+ boolean shouldPreserveHoodieMetadata,
+ String instantTime,
+ ExecutorService clusteringExecutorService) {
+ return super.runClusteringForGroupAsyncAsRow(
+ clusteringGroup, strategyParams, shouldPreserveHoodieMetadata,
instantTime, clusteringExecutorService);
+ }
+
+ /**
+ * Routing for the {@link ClusteringGroupWriter} SPI. Returns the writer's
future when a
+ * writer is registered, reports enabled, AND can serve the group. Returns
+ * {@link Option#empty()} otherwise so the caller falls back to the default
path.
+ *
+ * <p>Package-private so tests can exercise the routing logic directly
without needing a
+ * real {@link HoodieTable} to drive {@code
super.runClusteringForGroupAsyncAsRow}.
+ */
+ Option<CompletableFuture<HoodieData<WriteStatus>>> tryDelegateToGroupWriter(
+ HoodieClusteringGroup clusteringGroup,
+ Map<String, String> strategyParams,
+ boolean shouldPreserveHoodieMetadata,
+ String instantTime,
+ ExecutorService clusteringExecutorService) {
+ Option<ClusteringGroupWriter> writerOpt =
ClusteringGroupWriterRegistry.get();
+ if (!writerOpt.isPresent() || !writerOpt.get().isEnabled()) {
+ return Option.empty();
+ }
+ ClusteringGroupWriter writer = writerOpt.get();
+ log.info("Delegating clustering group (firstFileId={}, instant={}) to
ClusteringGroupWriter '{}'",
+ firstFileGroupId(clusteringGroup), instantTime, writer.name());
+ ClusteringGroupWriteContext context = ClusteringGroupWriteContext.builder()
+ .clusteringGroup(clusteringGroup)
+ .strategyParams(strategyParams)
+ .shouldPreserveHoodieMetadata(shouldPreserveHoodieMetadata)
+ .instantTime(instantTime)
+ .clusteringExecutorService(clusteringExecutorService)
+ .schema(getCachedSchema())
+ .table(getHoodieTable())
+ .writeConfig(getWriteConfig())
+ .build();
+ Option<CompletableFuture<HoodieData<WriteStatus>>> result =
writer.runClusteringForGroupAsync(context);
Review Comment:
🤖 The schema handed to the SPI here differs from what the default Row path
resolves. `super.runClusteringForGroupAsyncAsRow` uses `addMetadataFields(new
TableSchemaResolver(metaClient).getTableSchema(false),
allowOperationMetadataField)` (and the existing comment notes "incase of MIT,
config.getSchema may not contain the full table schema"), whereas
`getCachedSchema()` is just `HoodieSchema.parse(getWriteConfig().getSchema())`
— no metadata fields and no `TableSchemaResolver`. Could you clarify what the
SPI is expected to receive? If implementers assume parity with the default
path, they could silently miss `_hoodie_*` columns or get a stale schema in MIT
scenarios.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]