This is an automated email from the ASF dual-hosted git repository.
diqiu50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new ff3f385b4f [#11534] fix(spark-connector): GravitinoGlueCatalog does
not invalidate Iceberg SparkCatalog cache after table mutations (#11559)
ff3f385b4f is described below
commit ff3f385b4fba90ceaea9b57459fa6f55de720b5c
Author: Yuhui <[email protected]>
AuthorDate: Wed Jun 10 21:56:45 2026 +0800
[#11534] fix(spark-connector): GravitinoGlueCatalog does not invalidate
Iceberg SparkCatalog cache after table mutations (#11559)
### What changes were proposed in this pull request?
Override `invalidateTable` in `GravitinoGlueCatalog` to also clear
`icebergGlueCatalog`'s cache. Change `BaseCatalog` mutation methods to
call `invalidateTable(ident)` via virtual dispatch so the override takes
effect.
### Why are the changes needed?
`icebergGlueCatalog`'s internal `CachingCatalog` was never invalidated
after table mutations, causing stale schema reads and
`IllegalStateException: Couldn't find <col> in [...]` after `ALTER TABLE
ADD COLUMNS`.
Fix #11534
### Does this PR introduce _any_ user-facing change?
Yes. Stale schema error on Iceberg Glue tables after
ALTER/DROP/PURGE/RENAME is fixed.
### How was this patch tested?
- Unit tests in `TestGravitinoGlueCatalog`
- Integration test `testIcebergAlterTableAddColumnCacheInvalidation` in
`SparkGlueCatalogIT`, verified against real AWS Glue + S3 (Spark 3.5)
---
.../spark/connector/catalog/BaseCatalog.java | 8 ++--
.../spark/connector/glue/GravitinoGlueCatalog.java | 18 +++++++-
.../connector/glue/TestGravitinoGlueCatalog.java | 48 ++++++++++++++++++++++
.../integration/test/glue/SparkGlueCatalogIT.java | 32 +++++++++++++++
4 files changed, 101 insertions(+), 5 deletions(-)
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
index 6f93518ad5..228ef8a395 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
@@ -279,7 +279,7 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
.map(sparkTableChangeConverter::toGravitinoTableChange)
.toArray(org.apache.gravitino.rel.TableChange[]::new);
try {
- sparkCatalog.invalidateTable(ident);
+ invalidateTable(ident);
org.apache.gravitino.rel.Table gravitinoTable =
gravitinoCatalogClient
.asTableCatalog()
@@ -301,7 +301,7 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
@Override
public boolean dropTable(Identifier ident) {
- sparkCatalog.invalidateTable(ident);
+ invalidateTable(ident);
return gravitinoCatalogClient
.asTableCatalog()
.dropTable(NameIdentifier.of(getDatabase(ident), ident.name()));
@@ -309,7 +309,7 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
@Override
public boolean purgeTable(Identifier ident) {
- sparkCatalog.invalidateTable(ident);
+ invalidateTable(ident);
return gravitinoCatalogClient
.asTableCatalog()
.purgeTable(NameIdentifier.of(getDatabase(ident), ident.name()));
@@ -354,7 +354,7 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
org.apache.gravitino.rel.TableChange rename =
org.apache.gravitino.rel.TableChange.rename(newIdent.name());
try {
- sparkCatalog.invalidateTable(oldIdent);
+ invalidateTable(oldIdent);
gravitinoCatalogClient
.asTableCatalog()
.alterTable(NameIdentifier.of(getDatabase(oldIdent),
oldIdent.name()), rename);
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
index 47100103c0..dfcef08b0f 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
@@ -69,7 +69,7 @@ public class GravitinoGlueCatalog extends BaseCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoGlueCatalog.class);
// Lazily initialized Iceberg GlueCatalog for Iceberg tables
- private volatile SparkCatalog icebergGlueCatalog;
+ volatile SparkCatalog icebergGlueCatalog;
// Store original config for Iceberg catalog initialization
private String catalogName;
@@ -222,6 +222,22 @@ public class GravitinoGlueCatalog extends BaseCatalog {
return new SparkHiveTypeConverter();
}
+ /**
+ * Invalidates both the Hive backend and the Iceberg backend caches for the
given table.
+ *
+ * <p>{@link BaseCatalog} only calls {@code sparkCatalog.invalidateTable},
which clears the {@link
+ * HiveTableCatalog} cache. The Iceberg {@link SparkCatalog} maintains its
own {@code
+ * CachingCatalog} that must be invalidated separately after any table
mutation (ALTER, DROP,
+ * PURGE, RENAME) to avoid stale schema errors on subsequent reads.
+ */
+ @Override
+ public void invalidateTable(Identifier ident) {
+ super.invalidateTable(ident);
+ if (icebergGlueCatalog != null) {
+ icebergGlueCatalog.invalidateTable(ident);
+ }
+ }
+
/**
* Returns true if the Gravitino table is an Iceberg-format table based on
its properties.
*
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
index 757bb6510b..37ad819fa5 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
@@ -21,6 +21,8 @@ package org.apache.gravitino.spark.connector.glue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
@@ -39,8 +41,10 @@ import
org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTransformConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.gravitino.spark.connector.catalog.GravitinoCatalogManager;
+import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -243,6 +247,50 @@ public class TestGravitinoGlueCatalog {
Assertions.assertNotNull(transformer);
}
+ // -------------------------------------------------------------------------
+ // Test invalidateTable propagates to icebergGlueCatalog
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testInvalidateTableCallsBothCatalogsWhenIcebergInitialized() throws
Exception {
+ TableCatalog mockSparkCatalog = mock(TableCatalog.class);
+ SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+ Identifier ident = Identifier.of(new String[] {"db"}, "tbl");
+
+ GravitinoGlueCatalog catalog =
+ new GravitinoGlueCatalog() {
+ {
+ sparkCatalog = mockSparkCatalog;
+ icebergGlueCatalog = mockIcebergCatalog;
+ }
+ };
+
+ catalog.invalidateTable(ident);
+
+ verify(mockSparkCatalog).invalidateTable(ident);
+ verify(mockIcebergCatalog).invalidateTable(ident);
+ }
+
+ @Test
+ void testInvalidateTableCallsOnlySparkCatalogWhenIcebergNotInitialized()
throws Exception {
+ TableCatalog mockSparkCatalog = mock(TableCatalog.class);
+ SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+ Identifier ident = Identifier.of(new String[] {"db"}, "tbl");
+
+ GravitinoGlueCatalog catalog =
+ new GravitinoGlueCatalog() {
+ {
+ sparkCatalog = mockSparkCatalog;
+ // icebergGlueCatalog left null (not initialized)
+ }
+ };
+
+ catalog.invalidateTable(ident);
+
+ verify(mockSparkCatalog).invalidateTable(ident);
+ verify(mockIcebergCatalog, never()).invalidateTable(any());
+ }
+
// -------------------------------------------------------------------------
// Helper methods
// -------------------------------------------------------------------------
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
index db79faf217..da65d3541c 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
@@ -390,6 +390,38 @@ public abstract class SparkGlueCatalogIT extends
SparkGlueEnvIT {
Assertions.assertEquals("1,name1,25", tableData.get(0));
}
+ /**
+ * Regression test for https://github.com/apache/gravitino/issues/11534.
+ *
+ * <p>After ALTER TABLE ADD COLUMNS on an Iceberg table, querying the new
column must succeed.
+ * Previously, icebergGlueCatalog's CachingCatalog was never invalidated
after alterTable, causing
+ * {@code IllegalStateException: Couldn't find <newCol> in [<oldCols>]}.
+ */
+ @Test
+ void testIcebergAlterTableAddColumnCacheInvalidation() {
+ String tableName = "test_iceberg_alter_cache";
+ dropTableIfExists(tableName);
+ sql(
+ String.format(
+ "CREATE TABLE %s (id INT COMMENT 'id', name STRING COMMENT 'name')
USING iceberg",
+ tableName));
+ sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+
+ // Warm up the Iceberg SparkCatalog's CachingCatalog with an initial read
+ List<String> before = getTableData(tableName);
+ Assertions.assertEquals(1, before.size());
+ Assertions.assertEquals("1,Alice", before.get(0));
+
+ // Add a column — must invalidate icebergGlueCatalog cache
+ sql(String.format("ALTER TABLE %s ADD COLUMNS (age INT)", tableName));
+
+ // Before the fix this threw:
+ // IllegalStateException: Couldn't find age#N in [id#N, name#N]
+ List<String> after = getQueryData(String.format("SELECT id, name, age FROM
%s", tableName));
+ Assertions.assertEquals(1, after.size());
+ Assertions.assertEquals("1,Alice,NULL", after.get(0));
+ }
+
@Test
void testRenameIcebergTable() {
String oldName = "test_iceberg_rename_old";