This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 833ef62055e5 fix: Empty write should not cause spark analysis errors 
with pre-commit validators (#18128)
833ef62055e5 is described below

commit 833ef62055e5d9b98b1c48196a02db7c7b9ca2e3
Author: Krishen <[email protected]>
AuthorDate: Thu Feb 12 11:20:15 2026 -0800

    fix: Empty write should not cause spark analysis errors with pre-commit 
validators (#18128)
    
    
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
---
 .../hudi/client/utils/SparkValidatorUtils.java     | 45 +++++++---
 .../hudi/client/utils/TestSparkValidatorUtils.java | 98 ++++++++++++++++++++++
 2 files changed, 131 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index 97e4473576c2..c7804631d3f6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -135,21 +136,39 @@ public class SparkValidatorUtils {
         .collect(Collectors.toList());
 
     if (committedFiles.isEmpty()) {
-      try {
-        return sqlContext.createDataFrame(
-            sqlContext.emptyDataFrame().rdd(),
-            HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(
-                new 
TableSchemaResolver(table.getMetaClient()).getTableSchema()));
-      } catch (Exception e) {
-        LOG.warn("Cannot get table schema from before state.", e);
-        LOG.warn("Using the schema from after state (current transaction) to 
create the empty Spark dataframe: {}", newStructTypeSchema);
-        return sqlContext.createDataFrame(
-            sqlContext.emptyDataFrame().rdd(), newStructTypeSchema);
-      }
+      return createEmptyDataFrameWithTableSchema(sqlContext, table, 
Option.of(newStructTypeSchema));
     }
     return readRecordsForBaseFiles(sqlContext, committedFiles, table);
   }
 
+  /**
+   * Creates an empty DataFrame with table schema for use when there are no 
files to read.
+   * If table schema cannot be resolved and a fallback schema is provided, 
uses that schema;
+   * otherwise returns a schema-less empty DataFrame.
+   *
+   * @param sqlContext     Spark {@link SQLContext} instance.
+   * @param table          {@link HoodieTable} instance.
+   * @param fallbackSchema Optional schema to use when table schema cannot be 
resolved (e.g. after state schema); empty to return schema-less empty DataFrame.
+   * @return Empty DataFrame with table or fallback schema, or schema-less if 
no fallback.
+   */
+  private static Dataset<Row> createEmptyDataFrameWithTableSchema(SQLContext 
sqlContext,
+                                                                  HoodieTable 
table,
+                                                                  
Option<StructType> fallbackSchema) {
+    try {
+      return sqlContext.createDataFrame(
+          sqlContext.emptyDataFrame().rdd(),
+          HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(
+              new 
TableSchemaResolver(table.getMetaClient()).getTableSchema()));
+    } catch (Exception e) {
+      LOG.warn("Could not get table schema for empty DataFrame.", e);
+      if (fallbackSchema.isPresent()) {
+        LOG.warn("Using fallback schema to create the empty Spark DataFrame: 
{}", fallbackSchema.get());
+        return sqlContext.createDataFrame(sqlContext.emptyDataFrame().rdd(), 
fallbackSchema.get());
+      }
+      return sqlContext.emptyDataFrame();
+    }
+  }
+
   /**
    * Get records from specified list of data files.
    */
@@ -201,7 +220,9 @@ public class SparkValidatorUtils {
         .collect(Collectors.toList());
 
     if (newFiles.isEmpty()) {
-      return sqlContext.emptyDataFrame();
+      // Empty write: return empty DataFrame with table schema so validators 
that reference
+      // columns (e.g. _row_key) do not fail with AnalysisException "Column 
... does not exist".
+      return createEmptyDataFrameWithTableSchema(sqlContext, table, 
Option.empty());
     }
 
     return readRecordsForBaseFiles(sqlContext, newFiles, table);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
new file mode 100644
index 000000000000..c53b5ceb6d1c
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+
+/**
+ * Tests for {@link SparkValidatorUtils}.
+ * "Column ... does not exist" when running a precommit validation query 
against an empty write.
+ */
+public class TestSparkValidatorUtils extends HoodieClientTestBase {
+
+  /**
+   * When a SQL validation query references column names (e.g. _row_key) and 
the write has
+   * no new base files (empty commit), getRecordsFromPendingCommits returns an 
empty DataFrame.
+   * Without the fix, that DataFrame has no schema, so the validator fails with
+   * AnalysisException: Column '_row_key' does not exist.
+   * This test ensures validation still succeeds by using an inferred schema 
for the empty DataFrame.
+   */
+  @Test
+  public void testSqlQueryValidatorWithNoRecords() throws Exception {
+    HoodieWriteConfig writeConfig = getConfigBuilder().build();
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+
+      // Create initial commit with data so table has schema and data
+      String firstCommit = "001";
+      writeBatch(
+          writeClient,
+          firstCommit,
+          "000",
+          Option.empty(),
+          "000",
+          10,
+          generateWrapRecordsFn(false, writeConfig, dataGen::generateInserts),
+          SparkRDDWriteClient::bulkInsert,
+          true,
+          10,
+          10,
+          1,
+          false,
+          INSTANT_GENERATOR);
+
+      // Empty insert so the commit goes through the executor and runs 
pre-commit validators.
+      // Start the commit (creates REQUESTED) so the executor can transition 
to INFLIGHT; then insert(empty)
+      // runs the executor, which runs validators. 
getRecordsFromPendingCommits returns empty DataFrame
+      // with table schema; validator must not throw.
+      metaClient.reloadActiveTimeline();
+      String secondCommit = "002";
+      HoodieWriteConfig configWithValidator = getConfigBuilder()
+          .withPreCommitValidatorConfig(
+              HoodiePreCommitValidatorConfig.newBuilder()
+                  
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
+                  .withPrecommitValidatorSingleResultSqlQueries(
+                      "SELECT COUNT(1) AS result FROM (SELECT _row_key FROM 
<TABLE_NAME> GROUP BY _row_key HAVING COUNT(1) > 1) a#0")
+                  .build())
+          .build();
+
+      try (SparkRDDWriteClient validatorClient = 
getHoodieWriteClient(configWithValidator)) {
+        WriteClientTestUtils.startCommitWithTime(validatorClient, 
secondCommit);
+        JavaRDD<HoodieRecord> emptyRecords = jsc.emptyRDD();
+        validatorClient.insert(emptyRecords, secondCommit);
+      }
+
+      metaClient.reloadActiveTimeline();
+      Assertions.assertEquals(2, 
metaClient.getActiveTimeline().countInstants(),
+          "Should have 2 commits (one with data, one empty)");
+    }
+  }
+}

Reply via email to