This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 8e93bde39d [#11592] fix(spark-connector): Fix INSERT into Glue Iceberg
tables with time-based partition transforms (#11593)
8e93bde39d is described below
commit 8e93bde39db4d26c5d7a5046b1a18487868ec0f3
Author: Yuhui <[email protected]>
AuthorDate: Thu Jun 11 18:52:28 2026 +0800
[#11592] fix(spark-connector): Fix INSERT into Glue Iceberg tables with
time-based partition transforms (#11593)
### What changes were proposed in this pull request?
Override `loadFunction` in `GravitinoGlueCatalog` to delegate to the
Iceberg `SparkCatalog`, and add `testIcebergPartitions` to
`SparkCommonIT` for Glue integration test.
### Why are the changes needed?
Fix: #11592
`GravitinoGlueCatalog` didn't override `loadFunction`, so Spark couldn't
resolve Iceberg built-in partition functions (`days`, `hours`, `months`,
`years`) during INSERT planning.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added `testIcebergPartitions` to `SparkCommonIT` and enabled it in
`SparkGlueCatalogIT`. Verified against real AWS Glue
(`SparkAwsGlueCatalogIT35`).
---
.../spark/connector/glue/GravitinoGlueCatalog.java | 21 ++++++
.../connector/glue/TestGravitinoGlueCatalog.java | 40 +++++++++++
.../connector/integration/test/SparkCommonIT.java | 82 +++++++++++++++++++++
.../integration/test/glue/SparkGlueCatalogIT.java | 5 ++
.../test/iceberg/SparkIcebergCatalogIT.java | 84 ++--------------------
5 files changed, 153 insertions(+), 79 deletions(-)
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
index dfcef08b0f..6ac600140a 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/glue/GravitinoGlueCatalog.java
@@ -37,10 +37,12 @@ import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.kyuubi.spark.connector.hive.HiveTable;
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -222,6 +224,25 @@ public class GravitinoGlueCatalog extends BaseCatalog {
return new SparkHiveTypeConverter();
}
+ /**
+ * Delegates function lookup to the Iceberg {@link SparkCatalog}, which
registers Iceberg built-in
+ * partition functions ({@code days}, {@code hours}, {@code months}, {@code
years}, etc.). Falls
+ * back to the Gravitino function registry for non-Iceberg functions.
+ *
+ * <p>Without this override, {@link
org.apache.gravitino.spark.connector.catalog.BaseCatalog}
+ * would only query the Gravitino server's function registry, which does not
include Iceberg
+ * built-ins. This causes {@code INSERT} into Iceberg tables partitioned by
time-based transforms
+ * to fail with {@code AnalysisException: days(col) is not currently
supported}.
+ */
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+ try {
+ return getOrCreateIcebergGlueCatalog().loadFunction(ident);
+ } catch (NoSuchFunctionException e) {
+ return super.loadFunction(ident);
+ }
+ }
+
/**
* Invalidates both the Hive backend and the Iceberg backend caches for the
given table.
*
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
index 37ad819fa5..af980f7309 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/glue/TestGravitinoGlueCatalog.java
@@ -20,6 +20,7 @@
package org.apache.gravitino.spark.connector.glue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -42,9 +43,11 @@ import
org.apache.gravitino.spark.connector.SparkTransformConverter;
import org.apache.gravitino.spark.connector.SparkTypeConverter;
import org.apache.gravitino.spark.connector.catalog.GravitinoCatalogManager;
import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -291,10 +294,47 @@ public class TestGravitinoGlueCatalog {
verify(mockIcebergCatalog, never()).invalidateTable(any());
}
+ // -------------------------------------------------------------------------
+ // Test loadFunction delegation to Iceberg catalog
+ // -------------------------------------------------------------------------
+
+ @Test
+ void testLoadFunctionDelegatesToIcebergCatalog() throws
NoSuchFunctionException {
+ SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+ UnboundFunction mockFunction = mock(UnboundFunction.class);
+ Identifier ident = Identifier.of(new String[] {}, "years");
+ when(mockIcebergCatalog.loadFunction(ident)).thenReturn(mockFunction);
+
+ Assertions.assertSame(
+ mockFunction,
makeGlueCatalogWithIceberg(mockIcebergCatalog).loadFunction(ident));
+ }
+
+ @Test
+ void testLoadFunctionFallsBackToSuperWhenIcebergThrows() throws
NoSuchFunctionException {
+ SparkCatalog mockIcebergCatalog = mock(SparkCatalog.class);
+ Identifier ident = Identifier.of(new String[] {}, "custom_func");
+
doThrow(mock(NoSuchFunctionException.class)).when(mockIcebergCatalog).loadFunction(any());
+
+ GravitinoGlueCatalog catalog =
makeGlueCatalogWithIceberg(mockIcebergCatalog);
+ // When Iceberg throws NoSuchFunctionException, the catch block calls
super.loadFunction.
+ // In the test environment, super also throws; verify Iceberg was
consulted first.
+ Assertions.assertThrows(Exception.class, () ->
catalog.loadFunction(ident));
+ verify(mockIcebergCatalog).loadFunction(ident);
+ }
+
// -------------------------------------------------------------------------
// Helper methods
// -------------------------------------------------------------------------
+ /** Creates a GravitinoGlueCatalog with the given Iceberg catalog
pre-injected. */
+ private GravitinoGlueCatalog makeGlueCatalogWithIceberg(SparkCatalog
icebergCatalog) {
+ return new GravitinoGlueCatalog() {
+ {
+ icebergGlueCatalog = icebergCatalog;
+ }
+ };
+ }
+
/** Creates a mock Gravitino Table with the given properties. */
private Table createMockGravitinoTable(java.util.Map<String, String>
properties) {
Table mockTable = mock(Table.class);
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
index 7f637ff78b..b743787498 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/SparkCommonIT.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -129,6 +130,11 @@ public abstract class SparkCommonIT extends SparkEnvIT {
return true;
}
+ /** Returns whether this catalog supports Iceberg time-based partition
transforms. */
+ protected boolean supportsIcebergPartitionTransforms() {
+ return false;
+ }
+
protected SparkTableInfoChecker getTableInfoChecker() {
return SparkTableInfoChecker.create();
}
@@ -1059,4 +1065,80 @@ public abstract class SparkCommonIT extends SparkEnvIT {
String location = tableInfo.getTableLocation();
Assertions.assertDoesNotThrow(() ->
getSparkSession().read().parquet(location).printSchema());
}
+
+ /**
+ * Returns column definitions for a simple Iceberg table with integer,
string, and timestamp
+ * columns.
+ */
+ protected List<SparkColumnInfo> getIcebergSimpleTableColumn() {
+ return Arrays.asList(
+ SparkColumnInfo.of("id", DataTypes.IntegerType, "id comment"),
+ SparkColumnInfo.of("name", DataTypes.StringType, ""),
+ SparkColumnInfo.of("ts", DataTypes.TimestampType, null));
+ }
+
+ /** Returns the CREATE TABLE SQL for a simple Iceberg table with id, name,
and ts columns. */
+ protected String getCreateIcebergSimpleTableString(String tableName) {
+ return String.format(
+ "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '',
ts TIMESTAMP)",
+ tableName);
+ }
+
+ @Test
+ @EnabledIf("supportsIcebergPartitionTransforms")
+ protected void testIcebergPartitions() {
+ Map<String, String> partitionPaths =
+ ImmutableMap.of(
+ "years", "name=a/name_trunc=a/id_bucket=4/ts_year=2024",
+ "months", "name=a/name_trunc=a/id_bucket=4/ts_month=2024-01",
+ "days", "name=a/name_trunc=a/id_bucket=4/ts_day=2024-01-01",
+ "hours", "name=a/name_trunc=a/id_bucket=4/ts_hour=2024-01-01-12");
+ partitionPaths
+ .keySet()
+ .forEach(
+ func -> {
+ String tableName =
String.format("test_iceberg_%s_partition_table", func);
+ dropTableIfExists(tableName);
+ sql(
+ getCreateIcebergSimpleTableString(tableName)
+ + String.format(
+ " USING iceberg PARTITIONED BY (name, truncate(1,
name), bucket(16, id), %s(ts));",
+ func));
+ SparkTableInfo tableInfo = getTableInfo(tableName);
+ SparkTableInfoChecker checker =
+ SparkTableInfoChecker.create()
+ .withName(tableName)
+ .withColumns(getIcebergSimpleTableColumn())
+ .withIdentifyPartition(Collections.singletonList("name"))
+ .withTruncatePartition(1, "name")
+ .withBucketPartition(16,
Collections.singletonList("id"));
+ switch (func) {
+ case "years":
+ checker.withYearPartition("ts");
+ break;
+ case "months":
+ checker.withMonthPartition("ts");
+ break;
+ case "days":
+ checker.withDayPartition("ts");
+ break;
+ case "hours":
+ checker.withHourPartition("ts");
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported partition
function: " + func);
+ }
+ checker.check(tableInfo);
+
+ sql(
+ String.format(
+ "INSERT into %s values(2,'a',cast('2024-01-01 12:00:00'
as timestamp));",
+ tableName));
+ List<String> queryResult = getTableData(tableName);
+ Assertions.assertEquals(1, queryResult.size());
+ Assertions.assertEquals("2,a,2024-01-01 12:00:00",
queryResult.get(0));
+ checkDirExists(
+ new Path(tableInfo.getTableLocation(), "data/" +
partitionPaths.get(func)));
+ });
+ }
}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
index da65d3541c..c981ab646b 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/glue/SparkGlueCatalogIT.java
@@ -125,6 +125,11 @@ public abstract class SparkGlueCatalogIT extends
SparkGlueEnvIT {
return false;
}
+ @Override
+ protected boolean supportsIcebergPartitionTransforms() {
+ return true;
+ }
+
@BeforeAll
@Override
protected void startUp() throws Exception {
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
index bb692ac687..aa3552a933 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogIT.java
@@ -24,7 +24,6 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -125,6 +124,11 @@ public abstract class SparkIcebergCatalogIT extends
SparkCommonIT {
return true;
}
+ @Override
+ protected boolean supportsIcebergPartitionTransforms() {
+ return true;
+ }
+
@Override
protected String getTableLocation(SparkTableInfo table) {
return String.join(File.separator, table.getTableLocation(), "data");
@@ -213,67 +217,6 @@ public abstract class SparkIcebergCatalogIT extends
SparkCommonIT {
});
}
- @Test
- void testIcebergPartitions() {
- Map<String, String> partitionPaths = new HashMap<>();
- partitionPaths.put("years",
"name=a/name_trunc=a/id_bucket=4/ts_year=2024");
- partitionPaths.put("months",
"name=a/name_trunc=a/id_bucket=4/ts_month=2024-01");
- partitionPaths.put("days",
"name=a/name_trunc=a/id_bucket=4/ts_day=2024-01-01");
- partitionPaths.put("hours",
"name=a/name_trunc=a/id_bucket=4/ts_hour=2024-01-01-12");
-
- partitionPaths
- .keySet()
- .forEach(
- func -> {
- String tableName =
String.format("test_iceberg_%s_partition_table", func);
- dropTableIfExists(tableName);
- String createTableSQL =
getCreateIcebergSimpleTableString(tableName);
- createTableSQL =
- createTableSQL
- + String.format(
- " PARTITIONED BY (name, truncate(1, name),
bucket(16, id), %s(ts));",
- func);
- sql(createTableSQL);
- SparkTableInfo tableInfo = getTableInfo(tableName);
- SparkTableInfoChecker checker =
- SparkTableInfoChecker.create()
- .withName(tableName)
- .withColumns(getIcebergSimpleTableColumn())
- .withIdentifyPartition(Collections.singletonList("name"))
- .withTruncatePartition(1, "name")
- .withBucketPartition(16,
Collections.singletonList("id"));
- switch (func) {
- case "years":
- checker.withYearPartition("ts");
- break;
- case "months":
- checker.withMonthPartition("ts");
- break;
- case "days":
- checker.withDayPartition("ts");
- break;
- case "hours":
- checker.withHourPartition("ts");
- break;
- default:
- throw new IllegalArgumentException("UnSupported partition
function: " + func);
- }
- checker.check(tableInfo);
-
- String insertData =
- String.format(
- "INSERT into %s values(2,'a',cast('2024-01-01 12:00:00'
as timestamp));",
- tableName);
- sql(insertData);
- List<String> queryResult = getTableData(tableName);
- Assertions.assertEquals(1, queryResult.size());
- Assertions.assertEquals("2,a,2024-01-01 12:00:00",
queryResult.get(0));
- String partitionExpression = partitionPaths.get(func);
- Path partitionPath = new Path(getTableLocation(tableInfo),
partitionExpression);
- checkDirExists(partitionPath);
- });
- }
-
@Test
void testIcebergMetadataColumns() throws NoSuchTableException {
testMetadataColumns();
@@ -1173,23 +1116,6 @@ public abstract class SparkIcebergCatalogIT extends
SparkCommonIT {
Assertions.assertEquals("1,1,1;1,1,1;1,1,1;1,1,1", String.join(";",
queryResult));
}
- private List<SparkTableInfo.SparkColumnInfo> getIcebergSimpleTableColumn() {
- return Arrays.asList(
- SparkTableInfo.SparkColumnInfo.of("id", DataTypes.IntegerType, "id
comment"),
- SparkTableInfo.SparkColumnInfo.of("name", DataTypes.StringType, ""),
- SparkTableInfo.SparkColumnInfo.of("ts", DataTypes.TimestampType,
null));
- }
-
- /**
- * Here we build a new `createIcebergSql` String for creating a table with a
field of timestamp
- * type to create the year/month,etc. partitions
- */
- private String getCreateIcebergSimpleTableString(String tableName) {
- return String.format(
- "CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '',
ts TIMESTAMP)",
- tableName);
- }
-
protected SparkMetadataColumnInfo[] getIcebergMetadataColumns() {
return new SparkMetadataColumnInfo[] {
new SparkMetadataColumnInfo("_spec_id", DataTypes.IntegerType, false),