This is an automated email from the ASF dual-hosted git repository.
jerryshao pushed a commit to branch branch-1.3
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.3 by this push:
new eb7c5e9550 [Cherry-pick to branch-1.3] [#11534] fix(spark-connector):
GravitinoGlueCatalog does not invalidate Iceberg SparkCatalog cache after table
mutations (#11559) (#11577)
eb7c5e9550 is described below
commit eb7c5e955012964d07e0a2aa83a137573bfd9536
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 10 17:27:07 2026 -0700
[Cherry-pick to branch-1.3] [#11534] fix(spark-connector):
GravitinoGlueCatalog does not invalidate Iceberg SparkCatalog cache after table
mutations (#11559) (#11577)
**Cherry-pick Information:**
- Original commit: ff3f385b4fba90ceaea9b57459fa6f55de720b5c
- Target branch: `branch-1.3`
- Status: ✅ Clean cherry-pick (no conflicts)
Co-authored-by: Yuhui <[email protected]>
---
.../spark/connector/catalog/BaseCatalog.java | 8 ++--
.../spark/connector/glue/GravitinoGlueCatalog.java | 18 +++++++-
.../connector/glue/TestGravitinoGlueCatalog.java | 48 ++++++++++++++++++++++
.../integration/test/glue/SparkGlueCatalogIT.java | 32 +++++++++++++++
4 files changed, 101 insertions(+), 5 deletions(-)
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
index 6f93518ad5..228ef8a395 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/BaseCatalog.java
@@ -279,7 +279,7 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
.map(sparkTableChangeConverter::toGravitinoTableChange)
.toArray(org.apache.gravitino.rel.TableChange[]::new);
try {
- sparkCatalog.invalidateTable(ident);
+ invalidateTable(ident);
org.apache.gravitino.rel.Table gravitinoTable =
gravitinoCatalogClient
.asTableCatalog()
@@ -301,7 +301,7 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
@Override
public boolean dropTable(Identifier ident) {
- sparkCatalog.invalidateTable(ident);
+ invalidateTable(ident);
return gravitinoCatalogClient
.asTableCatalog()
.dropTable(NameIdentifier.of(getDatabase(ident), ident.name()));
@@ -309,7 +309,7 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
@Override
public boolean purgeTable(Identifier ident) {
- sparkCatalog.invalidateTable(ident);
+ invalidateTable(ident);
return gravitinoCatalogClient
.asTableCatalog()
.purgeTable(NameIdentifier.of(getDatabase(ident), ident.name()));
@@ -354,7 +354,7 @@ public abstract class BaseCatalog implements TableCatalog,
SupportsNamespaces, F
org.apache.gravitino.rel.TableChange rename =
org.apache.gravitino.rel.TableChange.rename(newIdent.name());
try {
- sparkCatalog.invalidateTable(oldIdent);
+ invalidateTable(oldIdent);
gravitinoCatalogClient
.asTableCatalog()
.alterTable(NameIdentifier.of(getDatabase(oldIdent),
oldIdent.name()), rename);
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
index 47100103c0..dfcef08b0f 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
@@ -69,7 +69,7 @@ public class GravitinoGlueCatalog extends BaseCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoGlueCatalog.class);
// Lazily initialized Iceberg GlueCatalog for Iceberg tables
- private volatile SparkCatalog icebergGlueCatalog;
+ volatile SparkCatalog icebergGlueCatalog;
// Store original config for Iceberg catalog initialization
private String catalogName;
@@ -222,6 +222,22 @@ public class GravitinoGlueCatalog extends BaseCatalog {
return new SparkHiveTypeConverter();
}
+ /**
+ * Invalidates both the Hive backend and the Iceberg backend caches for the
given table.
+ *
+ * <p>{@link BaseCatalog} only calls {@code sparkCatalog.invalidateTable},
which clears the {@link
+ * HiveTableCatalog} cache. The Iceberg {@link SparkCatalog} maintains its
own {@code
+ * CachingCatalog} that must be invalidated separately after any table
mutation (ALTER, DROP,
+ * PURGE, RENAME) to avoid stale schema errors on subsequent reads.
+ */
+ @Override
+ public void invalidateTable(Identifier ident) {
+ super.invalidateTable(ident);
+ if (icebergGlueCatalog != null) {
+ icebergGlueCatalog.invalidateTable(ident);
+ }
+ }
+
/**
* Returns true if the Gravitino table is an Iceberg-format table based on
its properties.
*
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
index 757bb6510b..37ad819fa5 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
@@ -21,6 +21,8 @@ package org.apache.gravitino.spark.connector.glue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
@@ -39,8 +41,10 @@ import
org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTransformConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.gravitino.spark.connector.catalog.GravitinoCatalogManager;
+import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -243,6 +247,50 @@ public class TestGravitinoGlueCatalog {
Assertions.assertNotNull(transformer);
}
+ // -------------------------------------------------------------------------
+ // Test invalidateTable propagates to icebergGlueCatalog
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testInvalidateTableCallsBothCatalogsWhenIcebergInitialized() throws
Exception {
+ TableCatalog mockSparkCatalog = mock(TableCatalog.class);
+ SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+ Identifier ident = Identifier.of(new String[] {"db"}, "tbl");
+
+ GravitinoGlueCatalog catalog =
+ new GravitinoGlueCatalog() {
+ {
+ sparkCatalog = mockSparkCatalog;
+ icebergGlueCatalog = mockIcebergCatalog;
+ }
+ };
+
+ catalog.invalidateTable(ident);
+
+ verify(mockSparkCatalog).invalidateTable(ident);
+ verify(mockIcebergCatalog).invalidateTable(ident);
+ }
+
+ @Test
+ void testInvalidateTableCallsOnlySparkCatalogWhenIcebergNotInitialized()
throws Exception {
+ TableCatalog mockSparkCatalog = mock(TableCatalog.class);
+ SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+ Identifier ident = Identifier.of(new String[] {"db"}, "tbl");
+
+ GravitinoGlueCatalog catalog =
+ new GravitinoGlueCatalog() {
+ {
+ sparkCatalog = mockSparkCatalog;
+ // icebergGlueCatalog left null (not initialized)
+ }
+ };
+
+ catalog.invalidateTable(ident);
+
+ verify(mockSparkCatalog).invalidateTable(ident);
+ verify(mockIcebergCatalog, never()).invalidateTable(any());
+ }
+
// -------------------------------------------------------------------------
// Helper methods
// -------------------------------------------------------------------------
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
index 79b9ca4b33..ddcf473710 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
@@ -390,6 +390,38 @@ public abstract class SparkGlueCatalogIT extends
SparkGlueEnvIT {
Assertions.assertEquals("1,name1,25", tableData.get(0));
}
+ /**
+ * Regression test for https://github.com/apache/gravitino/issues/11534.
+ *
+ * <p>After ALTER TABLE ADD COLUMNS on an Iceberg table, querying the new
column must succeed.
+ * Previously, icebergGlueCatalog's CachingCatalog was never invalidated
after alterTable, causing
+ * {@code IllegalStateException: Couldn't find <newCol> in [<oldCols>]}.
+ */
+ @Test
+ void testIcebergAlterTableAddColumnCacheInvalidation() {
+ String tableName = "test_iceberg_alter_cache";
+ dropTableIfExists(tableName);
+ sql(
+ String.format(
+ "CREATE TABLE %s (id INT COMMENT 'id', name STRING COMMENT 'name')
USING iceberg",
+ tableName));
+ sql(String.format("INSERT INTO %s VALUES (1, 'Alice')", tableName));
+
+ // Warm up the Iceberg SparkCatalog's CachingCatalog with an initial read
+ List<String> before = getTableData(tableName);
+ Assertions.assertEquals(1, before.size());
+ Assertions.assertEquals("1,Alice", before.get(0));
+
+ // Add a column — must invalidate icebergGlueCatalog cache
+ sql(String.format("ALTER TABLE %s ADD COLUMNS (age INT)", tableName));
+
+ // Before the fix this threw:
+ // IllegalStateException: Couldn't find age#N in [id#N, name#N]
+ List<String> after = getQueryData(String.format("SELECT id, name, age FROM
%s", tableName));
+ Assertions.assertEquals(1, after.size());
+ Assertions.assertEquals("1,Alice,NULL", after.get(0));
+ }
+
@Test
void testCreateTableWithComment() {
String tableName = "test_table_with_comment";