This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 29649d01f5 [#7442] feat(iceberg): Support passing catalog information
by warehouse query param in IRC (#7444)
29649d01f5 is described below
commit 29649d01f5afc1fcce830f05c32d5081636d7915
Author: Bharath Krishna <[email protected]>
AuthorDate: Sun Jun 29 20:06:21 2025 -0700
[#7442] feat(iceberg): Support passing catalog information by warehouse
query param in IRC (#7444)
### What changes were proposed in this pull request?
Support passing catalog information through `warehouse` query param in
IRC. This also allows to not rely on the `prefix` param.
### Why are the changes needed?
Warehouse param is the Iceberg REST spec recommended way of passing the
information to the config API, as seen in `RESTSessionCatalog`
(https://github.com/apache/iceberg/blob/bcb68800e1a9946fced40c006c6b55ec02445c1c/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java#L1014)
Fix : #7442
### Does this PR introduce _any_ user-facing change?
User can now use `warehouse` property to specify the catalog name. This
provides the ability ti control the behavior of iceberg client by IRC
server.
### How was this patch tested?
- Added unit tests
- Also, tested with spark-sql client, results below:
- When warehouse and prefix are not set, this is the error seen in
spark-sql client
```
spark-sql (default)> select * from sbschema.bmurali_iceberg_table2;
[TABLE_OR_VIEW_NOT_FOUND] The table or view
`sbschema`.`bmurali_iceberg_table2` cannot be found. Verify the spelling and
correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema()
output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF
EXISTS.; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [sbschema, bmurali_iceberg_table2], [], false
```
On Gravitino server logs I see
```
2025-06-25 06:53:31.209 INFO [iceberg-rest-30]
[org.apache.gravitino.iceberg.common.utils.IcebergCatalogUtil.loadCatalogBackend(IcebergCatalogUtil.java:179)]
- Load catalog backend of MEMORY
2025-06-25 06:53:31.231 INFO [iceberg-rest-30]
[org.apache.gravitino.iceberg.service.IcebergExceptionMapper.toResponse(IcebergExceptionMapper.java:80)]
- Iceberg REST server error maybe caused by user request, response http
status: 404, exception: class
org.apache.iceberg.exceptions.NoSuchTableException, exception message: Table
does not exist: sbschema.bmurali_iceberg_table2
```
Seems like it tried to use memory catalog and eventually failed to find
the table , which is expected
When I set the `warehouse` to non-existent value, like
```
spark.sql.catalog.spark_catalog.warehouse=invalid
```
client says
```
Spark master: local[*], Application Id: local-1750834613853
Exception in thread "main"
org.apache.iceberg.exceptions.ServiceFailureException: Server error:
RuntimeException: Couldn't find Iceberg configuration for invalid
```
This is also expected behavior
Tested via API:
```
Created a test catalog:
curl -X POST -H "Accept: application/vnd.gravitino.v1+json" -H
"Content-Type: application/json" -d '{
"name":"s3_iceberg_catalog",
"type":"RELATIONAL",
"provider":"lakehouse-iceberg",
"comment":"S3 Iceberg catalog for testing config",
"properties":{
"catalog-backend":"memory",
"uri":"memory://catalog",
"warehouse":"s3a://test-bucket/warehouse/",
"io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
"s3.endpoint":"http://localhost:9000",
"s3.access-key-id":"test-access-key",
"s3.secret-access-key":"test-secret-key",
"s3.region":"us-east-1"
}
}' http://localhost:8090/api/metalakes/test_metalake/catalogs
Fetched the configs of this catalog:
curl -X GET "http://localhost:9001/iceberg/v1/config?warehou
se=s3_iceberg_catalog"
{"defaults":{"io-impl":"org.apache.iceberg.aws.s3.S3FileIO","prefix":"s3_iceberg_catalog"},"overrides":{}}%
```
---
.../integration/test/CatalogIcebergBaseIT.java | 8 +++-
.../test/iceberg/FlinkIcebergRestCatalogIT.java | 2 -
.../iceberg/service/CatalogWrapperForREST.java | 2 +-
.../service/IcebergCatalogWrapperManager.java | 4 +-
.../iceberg/service/IcebergExceptionMapper.java | 2 +
.../service/rest/IcebergConfigOperations.java | 27 ++++++++++-
.../iceberg/service/rest/IcebergRestTestUtil.java | 11 +++++
.../iceberg/service/rest/TestIcebergConfig.java | 54 ++++++++++++++++++++++
.../iceberg/SparkIcebergCatalogRestBackendIT.java | 2 -
9 files changed, 102 insertions(+), 10 deletions(-)
diff --git
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
index f0162a6ec8..560c9272cc 100644
---
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
+++
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java
@@ -212,12 +212,16 @@ public abstract class CatalogIcebergBaseIT extends BaseIT
{
catalogProperties.put(IcebergConfig.CATALOG_BACKEND.getKey(), TYPE);
catalogProperties.put(IcebergConfig.CATALOG_URI.getKey(), URIS);
- catalogProperties.put(IcebergConfig.CATALOG_WAREHOUSE.getKey(), WAREHOUSE);
+ if (!"rest".equalsIgnoreCase(TYPE)) {
+ catalogProperties.put(IcebergConfig.CATALOG_WAREHOUSE.getKey(),
WAREHOUSE);
+ }
catalogProperties.put(IcebergConfig.CATALOG_BACKEND_NAME.getKey(),
icebergCatalogBackendName);
Map<String, String> icebergCatalogProperties = Maps.newHashMap();
icebergCatalogProperties.put(IcebergConfig.CATALOG_URI.getKey(), URIS);
- icebergCatalogProperties.put(IcebergConfig.CATALOG_WAREHOUSE.getKey(),
WAREHOUSE);
+ if (!"rest".equalsIgnoreCase(TYPE)) {
+ icebergCatalogProperties.put(IcebergConfig.CATALOG_WAREHOUSE.getKey(),
WAREHOUSE);
+ }
icebergCatalogProperties.put(
IcebergConfig.CATALOG_BACKEND_NAME.getKey(),
icebergCatalogBackendName);
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergRestCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergRestCatalogIT.java
index e8a5b4eb06..c65a1191e8 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergRestCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergRestCatalogIT.java
@@ -38,8 +38,6 @@ public class FlinkIcebergRestCatalogIT extends
FlinkIcebergCatalogIT {
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST);
- catalogProperties.put(
- IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
warehouse);
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
icebergRestServiceUri);
return catalogProperties;
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
index 3c86629b52..9fcf71393e 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
@@ -106,7 +106,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
}
}
- private Map<String, String> getCatalogConfigToClient() {
+ public Map<String, String> getCatalogConfigToClient() {
return catalogConfigToClients;
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
index 7b3e18109f..1958d857e6 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
@@ -89,7 +90,8 @@ public class IcebergCatalogWrapperManager implements
AutoCloseable {
private CatalogWrapperForREST createCatalogWrapper(String catalogName) {
Optional<IcebergConfig> icebergConfig =
configProvider.getIcebergCatalogConfig(catalogName);
if (!icebergConfig.isPresent()) {
- throw new RuntimeException("Couldn't find Iceberg configuration for " +
catalogName);
+ throw new NoSuchCatalogException(
+ "Couldn't find Iceberg configuration for catalog %s", catalogName);
}
return createCatalogWrapper(catalogName, icebergConfig.get());
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
index ed7d0a2f98..f89617652e 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
@@ -25,6 +25,7 @@ import javax.ws.rs.core.Response.Status;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
+import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
@@ -60,6 +61,7 @@ public class IcebergExceptionMapper implements
ExceptionMapper<Exception> {
.put(NoSuchNamespaceException.class, 404)
.put(NoSuchTableException.class, 404)
.put(NoSuchIcebergTableException.class, 404)
+ .put(NoSuchCatalogException.class, 404)
.put(UnsupportedOperationException.class, 406)
.put(NoSuchViewException.class, 404)
.put(AlreadyExistsException.class, 409)
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
index 15fef0e371..96f763794e 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergConfigOperations.java
@@ -20,14 +20,19 @@ package org.apache.gravitino.iceberg.service.rest;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
+import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.gravitino.iceberg.service.CatalogWrapperForREST;
+import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.iceberg.rest.responses.ConfigResponse;
@@ -41,12 +46,30 @@ public class IcebergConfigOperations {
@Context
private HttpServletRequest httpRequest;
+ private final IcebergCatalogWrapperManager catalogWrapperManager;
+
+ @Inject
+ public IcebergConfigOperations(IcebergCatalogWrapperManager
catalogWrapperManager) {
+ this.catalogWrapperManager = catalogWrapperManager;
+ }
+
@GET
@Produces(MediaType.APPLICATION_JSON)
@Timed(name = "config." + MetricNames.HTTP_PROCESS_DURATION, absolute = true)
@ResponseMetered(name = "config", absolute = true)
- public Response getConfig() {
- ConfigResponse response = ConfigResponse.builder().build();
+ public Response getConfig(@DefaultValue("") @QueryParam("warehouse") String
warehouse) {
+ if (warehouse == null || warehouse.isEmpty()) {
+ ConfigResponse response = ConfigResponse.builder().build();
+ return IcebergRestUtils.ok(response);
+ }
+
+ // Get the catalog wrapper which contains the configuration
+ CatalogWrapperForREST catalogWrapper =
catalogWrapperManager.getCatalogWrapper(warehouse);
+ ConfigResponse response =
+ ConfigResponse.builder()
+ .withDefaults(catalogWrapper.getCatalogConfigToClient())
+ .withDefault("prefix", warehouse)
+ .build();
return IcebergRestUtils.ok(response);
}
}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
index 3d322a7a8a..7cfa4cc6fc 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
@@ -111,6 +111,17 @@ public class IcebergRestTestUtil {
catalogConf.put(String.format("%s.catalog-backend-name",
catalogConfigPrefix), PREFIX);
catalogConf.put(
CredentialConstants.CREDENTIAL_PROVIDERS,
DummyCredentialProvider.DUMMY_CREDENTIAL_TYPE);
+ catalogConf.put(
+ String.format("%s.%s", catalogConfigPrefix,
IcebergConstants.IO_IMPL),
+ "org.apache.iceberg.aws.s3.S3FileIO");
+ catalogConf.put(
+ String.format("%s.%s", catalogConfigPrefix,
IcebergConstants.ICEBERG_S3_ENDPOINT),
+ "https://s3-endpoint.example.com");
+ catalogConf.put(
+ String.format("%s.%s", catalogConfigPrefix,
IcebergConstants.AWS_S3_REGION), "us-west-2");
+ catalogConf.put(
+ String.format("%s.%s", catalogConfigPrefix,
IcebergConstants.ICEBERG_OSS_ENDPOINT),
+ "https://oss-endpoint.example.com");
IcebergConfigProvider configProvider =
IcebergConfigProviderFactory.create(catalogConf);
configProvider.initialize(catalogConf);
// used to override register table interface
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
index 1116fc0bb0..04ac0cab10 100644
---
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
@@ -18,11 +18,14 @@
*/
package org.apache.gravitino.iceberg.service.rest;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
import java.util.Optional;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
@@ -48,6 +51,57 @@ public class TestIcebergConfig extends IcebergTestBase {
Assertions.assertEquals(0, response.overrides().size());
}
+ @ParameterizedTest
+ @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+ public void testConfigWithEmptyWarehouse(String prefix) {
+ setUrlPathWithPrefix(prefix);
+ Map<String, String> queryParams = ImmutableMap.of("warehouse", "");
+ Response resp =
+ getIcebergClientBuilder(IcebergRestTestUtil.CONFIG_PATH,
Optional.of(queryParams)).get();
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ ConfigResponse response = resp.readEntity(ConfigResponse.class);
+ Assertions.assertEquals(0, response.defaults().size());
+ Assertions.assertEquals(0, response.overrides().size());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"", IcebergRestTestUtil.PREFIX})
+ public void testConfigWithValidWarehouse(String prefix) {
+ setUrlPathWithPrefix(prefix);
+ String warehouseName = IcebergRestTestUtil.PREFIX;
+ Map<String, String> queryParams = ImmutableMap.of("warehouse",
warehouseName);
+ Response resp =
+ getIcebergClientBuilder(IcebergRestTestUtil.CONFIG_PATH,
Optional.of(queryParams)).get();
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ConfigResponse response = resp.readEntity(ConfigResponse.class);
+ Map<String, String> expectedConfig =
+ ImmutableMap.of(
+ "prefix",
+ warehouseName,
+ IcebergConstants.IO_IMPL,
+ "org.apache.iceberg.aws.s3.S3FileIO",
+ IcebergConstants.ICEBERG_S3_ENDPOINT,
+ "https://s3-endpoint.example.com",
+ IcebergConstants.AWS_S3_REGION,
+ "us-west-2",
+ IcebergConstants.ICEBERG_OSS_ENDPOINT,
+ "https://oss-endpoint.example.com");
+ Assertions.assertEquals(expectedConfig, response.defaults());
+ Assertions.assertEquals(0, response.overrides().size());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"invalid-catalog", "warehouse_123"})
+ public void testConfigWithNonExistentWarehouses(String warehouse) {
+ Map<String, String> queryParams = ImmutableMap.of("warehouse", warehouse);
+ Response resp =
+ getIcebergClientBuilder(IcebergRestTestUtil.CONFIG_PATH,
Optional.of(queryParams)).get();
+ Assertions.assertEquals(404, resp.getStatus());
+ }
+
@ParameterizedTest
@ValueSource(strings = {"PREFIX", "", "\\\n\t\\\'", "\u0024", "\100", "[_~"})
void testIcebergRestValidPrefix(String prefix) {
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java
index a10c82e0e6..26f59302d2 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/iceberg/SparkIcebergCatalogRestBackendIT.java
@@ -37,8 +37,6 @@ public abstract class SparkIcebergCatalogRestBackendIT
extends SparkIcebergCatal
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST);
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
icebergRestServiceUri);
- catalogProperties.put(
- IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
warehouse);
return catalogProperties;
}