This is an automated email from the ASF dual-hosted git repository. github-actions[bot] pushed a commit to branch cherry-pick-8e93bde3-to-branch-1.3 in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 64b29c3147fd99844498480031f84ae713159df3 Author: Yuhui <[email protected]> AuthorDate: Thu Jun 11 18:52:28 2026 +0800 [#11592] fix(spark-connector): Fix INSERT into Glue Iceberg tables with time-based partition transforms (#11593) ### What changes were proposed in this pull request? Override `loadFunction` in `GravitinoGlueCatalog` to delegate to the Iceberg `SparkCatalog`, and add `testIcebergPartitions` to `SparkCommonIT` for Glue integration test. ### Why are the changes needed? Fix: #11592 `GravitinoGlueCatalog` didn't override `loadFunction`, so Spark couldn't resolve Iceberg built-in partition functions (`days`, `hours`, `months`, `years`) during INSERT planning. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added `testIcebergPartitions` to `SparkCommonIT` and enabled it in `SparkGlueCatalogIT`. Verified against real AWS Glue (`SparkAwsGlueCatalogIT35`). --- .../spark/connector/glue/GravitinoGlueCatalog.java | 21 ++++++ .../connector/glue/TestGravitinoGlueCatalog.java | 40 +++++++++++ .../connector/integration/test/SparkCommonIT.java | 82 +++++++++++++++++++++ .../integration/test/glue/SparkGlueCatalogIT.java | 5 ++ .../test/iceberg/SparkIcebergCatalogIT.java | 84 ++-------------------- 5 files changed, 153 insertions(+), 79 deletions(-) diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java index dfcef08b0f..6ac600140a 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java @@ -37,10 +37,12 @@ import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.source.SparkTable; import org.apache.kyuubi.spark.connector.hive.HiveTable; import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog; +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -222,6 +224,25 @@ public class GravitinoGlueCatalog extends BaseCatalog { return new SparkHiveTypeConverter(); } + /** + * Delegates function lookup to the Iceberg {@link SparkCatalog}, which registers Iceberg built-in + * partition functions ({@code days}, {@code hours}, {@code months}, {@code years}, etc.). Falls + * back to the Gravitino function registry for non-Iceberg functions. + * + * <p>Without this override, {@link org.apache.gravitino.spark.connector.catalog.BaseCatalog} + * would only query the Gravitino server's function registry, which does not include Iceberg + * built-ins. This causes {@code INSERT} into Iceberg tables partitioned by time-based transforms + * to fail with {@code AnalysisException: days(col) is not currently supported}. + */ + @Override + public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException { + try { + return getOrCreateIcebergGlueCatalog().loadFunction(ident); + } catch (NoSuchFunctionException e) { + return super.loadFunction(ident); + } + } + /** * Invalidates both the Hive backend and the Iceberg backend caches for the given table. * diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java index 37ad819fa5..af980f7309 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java @@ -20,6 +20,7 @@ package org.apache.gravitino.spark.connector.glue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -42,9 +43,11 @@ import org.apache.gravitino.spark.connector.SparkTransformConverter; import org.apache.gravitino.spark.connector.SparkTypeConverter; import org.apache.gravitino.spark.connector.catalog.GravitinoCatalogManager; import org.apache.iceberg.spark.SparkCatalog; +import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -291,10 +294,47 @@ public class TestGravitinoGlueCatalog { verify(mockIcebergCatalog, never()).invalidateTable(any()); } + // ------------------------------------------------------------------------- + // Test loadFunction delegation to Iceberg catalog + // ------------------------------------------------------------------------- + + @Test + void testLoadFunctionDelegatesToIcebergCatalog() throws NoSuchFunctionException { + SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class); + UnboundFunction mockFunction = mock(UnboundFunction.class); + Identifier ident = Identifier.of(new String[] {}, "years"); + when(mockIcebergCatalog.loadFunction(ident)).thenReturn(mockFunction); + + Assertions.assertSame( + mockFunction, makeGlueCatalogWithIceberg(mockIcebergCatalog).loadFunction(ident)); + } + + @Test + void testLoadFunctionFallsBackToSuperWhenIcebergThrows() throws NoSuchFunctionException { + SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class); + Identifier ident = Identifier.of(new String[] {}, "custom_func"); + doThrow(mock(NoSuchFunctionException.class)).when(mockIcebergCatalog).loadFunction(any()); + + GravitinoGlueCatalog catalog = makeGlueCatalogWithIceberg(mockIcebergCatalog); + // When Iceberg throws NoSuchFunctionException, the catch block calls super.loadFunction. + // In the test environment, super also throws; verify Iceberg was consulted first. + Assertions.assertThrows(Exception.class, () -> catalog.loadFunction(ident)); + verify(mockIcebergCatalog).loadFunction(ident); + } + // ------------------------------------------------------------------------- // Helper methods // ------------------------------------------------------------------------- + /** Creates a GravitinoGlueCatalog with the given Iceberg catalog pre-injected. */ + private GravitinoGlueCatalog makeGlueCatalogWithIceberg(SparkCatalog icebergCatalog) { + return new GravitinoGlueCatalog() { + { + icebergGlueCatalog = icebergCatalog; + } + }; + } + /** Creates a mock Gravitino Table with the given properties. */ private Table createMockGravitinoTable(java.util.Map<String, String> properties) { Table mockTable = mock(Table.class); diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java index 7f637ff78b..b743787498 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -129,6 +130,11 @@ public abstract class SparkCommonIT extends SparkEnvIT { return true; } + /** Returns whether this catalog supports Iceberg time-based partition transforms. */ + protected boolean supportsIcebergPartitionTransforms() { + return false; + } + protected SparkTableInfoChecker getTableInfoChecker() { return SparkTableInfoChecker.create(); } @@ -1059,4 +1065,80 @@ public abstract class SparkCommonIT extends SparkEnvIT { String location = tableInfo.getTableLocation(); Assertions.assertDoesNotThrow(() -> getSparkSession().read().parquet(location).printSchema()); } + + /** + * Returns column definitions for a simple Iceberg table with integer, string, and timestamp + * columns. + */ + protected List<SparkColumnInfo> getIcebergSimpleTableColumn() { + return Arrays.asList( + SparkColumnInfo.of("id", DataTypes.IntegerType, "id comment"), + SparkColumnInfo.of("name", DataTypes.StringType, ""), + SparkColumnInfo.of("ts", DataTypes.TimestampType, null)); + } + + /** Returns the CREATE TABLE SQL for a simple Iceberg table with id, name, and ts columns. */ + protected String getCreateIcebergSimpleTableString(String tableName) { + return String.format( + "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', ts TIMESTAMP)", + tableName); + } + + @Test + @EnabledIf("supportsIcebergPartitionTransforms") + protected void testIcebergPartitions() { + Map<String, String> partitionPaths = + ImmutableMap.of( + "years", "name=a/name_trunc=a/id_bucket=4/ts_year=2024", + "months", "name=a/name_trunc=a/id_bucket=4/ts_month=2024-01", + "days", "name=a/name_trunc=a/id_bucket=4/ts_day=2024-01-01", + "hours", "name=a/name_trunc=a/id_bucket=4/ts_hour=2024-01-01-12"); + partitionPaths + .keySet() + .forEach( + func -> { + String tableName = String.format("test_iceberg_%s_partition_table", func); + dropTableIfExists(tableName); + sql( + getCreateIcebergSimpleTableString(tableName) + + String.format( + " USING iceberg PARTITIONED BY (name, truncate(1, name), bucket(16, id), %s(ts));", + func)); + SparkTableInfo tableInfo = getTableInfo(tableName); + SparkTableInfoChecker checker = + SparkTableInfoChecker.create() + .withName(tableName) + .withColumns(getIcebergSimpleTableColumn()) + .withIdentifyPartition(Collections.singletonList("name")) + .withTruncatePartition(1, "name") + .withBucketPartition(16, Collections.singletonList("id")); + switch (func) { + case "years": + checker.withYearPartition("ts"); + break; + case "months": + checker.withMonthPartition("ts"); + break; + case "days": + checker.withDayPartition("ts"); + break; + case "hours": + checker.withHourPartition("ts"); + break; + default: + throw new IllegalArgumentException("Unsupported partition function: " + func); + } + checker.check(tableInfo); + + sql( + String.format( + "INSERT into %s values(2,'a',cast('2024-01-01 12:00:00' as timestamp));", + tableName)); + List<String> queryResult = getTableData(tableName); + Assertions.assertEquals(1, queryResult.size()); + Assertions.assertEquals("2,a,2024-01-01 12:00:00", queryResult.get(0)); + checkDirExists( + new Path(tableInfo.getTableLocation(), "data/" + partitionPaths.get(func))); + }); + } } diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java index da65d3541c..c981ab646b 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java @@ -125,6 +125,11 @@ public abstract class SparkGlueCatalogIT extends SparkGlueEnvIT { return false; } + @Override + protected boolean supportsIcebergPartitionTransforms() { + return true; + } + @BeforeAll @Override protected void startUp() throws Exception { diff --git a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java index bb692ac687..aa3552a933 100644 --- a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java +++ b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java @@ -24,7 +24,6 @@ import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -125,6 +124,11 @@ public abstract class SparkIcebergCatalogIT extends SparkCommonIT { return true; } + @Override + protected boolean supportsIcebergPartitionTransforms() { + return true; + } + @Override protected String getTableLocation(SparkTableInfo table) { return String.join(File.separator, table.getTableLocation(), "data"); @@ -213,67 +217,6 @@ public abstract class SparkIcebergCatalogIT extends SparkCommonIT { }); } - @Test - void testIcebergPartitions() { - Map<String, String> partitionPaths = new HashMap<>(); - partitionPaths.put("years", "name=a/name_trunc=a/id_bucket=4/ts_year=2024"); - partitionPaths.put("months", "name=a/name_trunc=a/id_bucket=4/ts_month=2024-01"); - partitionPaths.put("days", "name=a/name_trunc=a/id_bucket=4/ts_day=2024-01-01"); - partitionPaths.put("hours", "name=a/name_trunc=a/id_bucket=4/ts_hour=2024-01-01-12"); - - partitionPaths - .keySet() - .forEach( - func -> { - String tableName = String.format("test_iceberg_%s_partition_table", func); - dropTableIfExists(tableName); - String createTableSQL = getCreateIcebergSimpleTableString(tableName); - createTableSQL = - createTableSQL - + String.format( - " PARTITIONED BY (name, truncate(1, name), bucket(16, id), %s(ts));", - func); - sql(createTableSQL); - SparkTableInfo tableInfo = getTableInfo(tableName); - SparkTableInfoChecker checker = - SparkTableInfoChecker.create() - .withName(tableName) - .withColumns(getIcebergSimpleTableColumn()) - .withIdentifyPartition(Collections.singletonList("name")) - .withTruncatePartition(1, "name") - .withBucketPartition(16, Collections.singletonList("id")); - switch (func) { - case "years": - checker.withYearPartition("ts"); - break; - case "months": - checker.withMonthPartition("ts"); - break; - case "days": - checker.withDayPartition("ts"); - break; - case "hours": - checker.withHourPartition("ts"); - break; - default: - throw new IllegalArgumentException("UnSupported partition function: " + func); - } - checker.check(tableInfo); - - String insertData = - String.format( - "INSERT into %s values(2,'a',cast('2024-01-01 12:00:00' as timestamp));", - tableName); - sql(insertData); - List<String> queryResult = getTableData(tableName); - Assertions.assertEquals(1, queryResult.size()); - Assertions.assertEquals("2,a,2024-01-01 12:00:00", queryResult.get(0)); - String partitionExpression = partitionPaths.get(func); - Path partitionPath = new Path(getTableLocation(tableInfo), partitionExpression); - checkDirExists(partitionPath); - }); - } - @Test void testIcebergMetadataColumns() throws NoSuchTableException { testMetadataColumns(); @@ -1173,23 +1116,6 @@ public abstract class SparkIcebergCatalogIT extends SparkCommonIT { Assertions.assertEquals("1,1,1;1,1,1;1,1,1;1,1,1", String.join(";", queryResult)); } - private List<SparkTableInfo.SparkColumnInfo> getIcebergSimpleTableColumn() { - return Arrays.asList( - SparkTableInfo.SparkColumnInfo.of("id", DataTypes.IntegerType, "id comment"), - SparkTableInfo.SparkColumnInfo.of("name", DataTypes.StringType, ""), - SparkTableInfo.SparkColumnInfo.of("ts", DataTypes.TimestampType, null)); - } - - /** - * Here we build a new `createIcebergSql` String for creating a table with a field of timestamp - * type to create the year/month,etc. partitions - */ - private String getCreateIcebergSimpleTableString(String tableName) { - return String.format( - "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', ts TIMESTAMP)", - tableName); - } - protected SparkMetadataColumnInfo[] getIcebergMetadataColumns() { return new SparkMetadataColumnInfo[] { new SparkMetadataColumnInfo("_spec_id", DataTypes.IntegerType, false),
