This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 833ef62055e5 fix: Empty write should not cause spark analysis errors
with pre-commit validators (#18128)
833ef62055e5 is described below
commit 833ef62055e5d9b98b1c48196a02db7c7b9ca2e3
Author: Krishen <[email protected]>
AuthorDate: Thu Feb 12 11:20:15 2026 -0800
fix: Empty write should not cause spark analysis errors with pre-commit
validators (#18128)
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../hudi/client/utils/SparkValidatorUtils.java | 45 +++++++---
.../hudi/client/utils/TestSparkValidatorUtils.java | 98 ++++++++++++++++++++++
2 files changed, 131 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index 97e4473576c2..c7804631d3f6 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -135,21 +136,39 @@ public class SparkValidatorUtils {
.collect(Collectors.toList());
if (committedFiles.isEmpty()) {
- try {
- return sqlContext.createDataFrame(
- sqlContext.emptyDataFrame().rdd(),
- HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(
- new
TableSchemaResolver(table.getMetaClient()).getTableSchema()));
- } catch (Exception e) {
- LOG.warn("Cannot get table schema from before state.", e);
- LOG.warn("Using the schema from after state (current transaction) to
create the empty Spark dataframe: {}", newStructTypeSchema);
- return sqlContext.createDataFrame(
- sqlContext.emptyDataFrame().rdd(), newStructTypeSchema);
- }
+ return createEmptyDataFrameWithTableSchema(sqlContext, table,
Option.of(newStructTypeSchema));
}
return readRecordsForBaseFiles(sqlContext, committedFiles, table);
}
+ /**
+ * Creates an empty DataFrame with table schema for use when there are no
files to read.
+ * If table schema cannot be resolved and a fallback schema is provided,
uses that schema;
+ * otherwise returns a schema-less empty DataFrame.
+ *
+ * @param sqlContext Spark {@link SQLContext} instance.
+ * @param table {@link HoodieTable} instance.
+ * @param fallbackSchema Optional schema to use when table schema cannot be
resolved (e.g. after state schema); empty to return schema-less empty DataFrame.
+ * @return Empty DataFrame with table or fallback schema, or schema-less if
no fallback.
+ */
+ private static Dataset<Row> createEmptyDataFrameWithTableSchema(SQLContext
sqlContext,
+ HoodieTable
table,
+
Option<StructType> fallbackSchema) {
+ try {
+ return sqlContext.createDataFrame(
+ sqlContext.emptyDataFrame().rdd(),
+ HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(
+ new
TableSchemaResolver(table.getMetaClient()).getTableSchema()));
+ } catch (Exception e) {
+ LOG.warn("Could not get table schema for empty DataFrame.", e);
+ if (fallbackSchema.isPresent()) {
+ LOG.warn("Using fallback schema to create the empty Spark DataFrame:
{}", fallbackSchema.get());
+ return sqlContext.createDataFrame(sqlContext.emptyDataFrame().rdd(),
fallbackSchema.get());
+ }
+ return sqlContext.emptyDataFrame();
+ }
+ }
+
/**
* Get records from specified list of data files.
*/
@@ -201,7 +220,9 @@ public class SparkValidatorUtils {
.collect(Collectors.toList());
if (newFiles.isEmpty()) {
- return sqlContext.emptyDataFrame();
+ // Empty write: return empty DataFrame with table schema so validators
that reference
+ // columns (e.g. _row_key) do not fail with AnalysisException "Column
... does not exist".
+ return createEmptyDataFrameWithTableSchema(sqlContext, table,
Option.empty());
}
return readRecordsForBaseFiles(sqlContext, newFiles, table);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
new file mode 100644
index 000000000000..c53b5ceb6d1c
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+
+/**
+ * Tests for {@link SparkValidatorUtils}.
+ * "Column ... does not exist" when running a precommit validation query
against an empty write.
+ */
+public class TestSparkValidatorUtils extends HoodieClientTestBase {
+
+ /**
+ * When a SQL validation query references column names (e.g. _row_key) and
the write has
+ * no new base files (empty commit), getRecordsFromPendingCommits returns an
empty DataFrame.
+ * Without the fix, that DataFrame has no schema, so the validator fails with
+ * AnalysisException: Column '_row_key' does not exist.
+ * This test ensures validation still succeeds by using an inferred schema
for the empty DataFrame.
+ */
+ @Test
+ public void testSqlQueryValidatorWithNoRecords() throws Exception {
+ HoodieWriteConfig writeConfig = getConfigBuilder().build();
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+
+ // Create initial commit with data so table has schema and data
+ String firstCommit = "001";
+ writeBatch(
+ writeClient,
+ firstCommit,
+ "000",
+ Option.empty(),
+ "000",
+ 10,
+ generateWrapRecordsFn(false, writeConfig, dataGen::generateInserts),
+ SparkRDDWriteClient::bulkInsert,
+ true,
+ 10,
+ 10,
+ 1,
+ false,
+ INSTANT_GENERATOR);
+
+ // Empty insert so the commit goes through the executor and runs
pre-commit validators.
+ // Start the commit (creates REQUESTED) so the executor can transition
to INFLIGHT; then insert(empty)
+ // runs the executor, which runs validators.
getRecordsFromPendingCommits returns empty DataFrame
+ // with table schema; validator must not throw.
+ metaClient.reloadActiveTimeline();
+ String secondCommit = "002";
+ HoodieWriteConfig configWithValidator = getConfigBuilder()
+ .withPreCommitValidatorConfig(
+ HoodiePreCommitValidatorConfig.newBuilder()
+
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
+ .withPrecommitValidatorSingleResultSqlQueries(
+ "SELECT COUNT(1) AS result FROM (SELECT _row_key FROM
<TABLE_NAME> GROUP BY _row_key HAVING COUNT(1) > 1) a#0")
+ .build())
+ .build();
+
+ try (SparkRDDWriteClient validatorClient =
getHoodieWriteClient(configWithValidator)) {
+ WriteClientTestUtils.startCommitWithTime(validatorClient,
secondCommit);
+ JavaRDD<HoodieRecord> emptyRecords = jsc.emptyRDD();
+ validatorClient.insert(emptyRecords, secondCommit);
+ }
+
+ metaClient.reloadActiveTimeline();
+ Assertions.assertEquals(2,
metaClient.getActiveTimeline().countInstants(),
+ "Should have 2 commits (one with data, one empty)");
+ }
+ }
+}