This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 81ddf32c2 Implement PolicyCatalogAdapter (#1438)
81ddf32c2 is described below

commit 81ddf32c2ad298fed07eb589e7d5f917f8c4e604
Author: Honah (Jonas) J. <hon...@apache.org>
AuthorDate: Fri Apr 25 14:01:55 2025 -0500

    Implement PolicyCatalogAdapter (#1438)
---
 .../polaris/service/it/env/PolarisClient.java      |   9 +
 .../apache/polaris/service/it/env/PolicyApi.java   | 194 ++++++++++
 .../test/PolarisPolicyServiceIntegrationTest.java  | 428 +++++++++++++++++++++
 plugins/spark/v3.5/regtests/spark_sql.ref          |   2 +-
 .../polaris/core/config/FeatureConfiguration.java  |   7 +
 .../apache/polaris/core/rest/PolarisEndpoints.java |  45 +++
 .../polaris/core/rest/PolarisResourcePaths.java    |   8 +
 .../service/quarkus/it/QuarkusPolicyServiceIT.java |  10 +-
 .../it/QuarkusPolicyServiceIntegrationTest.java    |  10 +-
 regtests/t_spark_sql/ref/spark_sql_basic.sh.ref    |   2 +-
 regtests/t_spark_sql/ref/spark_sql_views.sh.ref    |   2 +-
 .../catalog/iceberg/IcebergCatalogAdapter.java     |   1 +
 .../catalog/policy/PolicyCatalogAdapter.java       | 213 ++++++++++
 .../catalog/policy/PolicyCatalogHandler.java       |   3 +
 14 files changed, 921 insertions(+), 13 deletions(-)

diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisClient.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisClient.java
index e18f8736b..baec590f9 100644
--- 
a/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisClient.java
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolarisClient.java
@@ -116,6 +116,15 @@ public final class PolarisClient implements AutoCloseable {
         client, endpoints, obtainToken(credentials), 
endpoints.catalogApiEndpoint());
   }
 
+  public PolicyApi policyApi(PrincipalWithCredentials principal) {
+    return new PolicyApi(client, endpoints, obtainToken(principal), 
endpoints.catalogApiEndpoint());
+  }
+
+  public PolicyApi policyApi(ClientCredentials credentials) {
+    return new PolicyApi(
+        client, endpoints, obtainToken(credentials), 
endpoints.catalogApiEndpoint());
+  }
+
   /**
    * Requests an access token from the Polaris server for the client ID/secret 
pair that is part of
    * the given principal data object.
diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolicyApi.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolicyApi.java
new file mode 100644
index 000000000..7597aaef3
--- /dev/null
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/PolicyApi.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.it.env;
+
+import jakarta.ws.rs.client.Client;
+import jakarta.ws.rs.client.Entity;
+import jakarta.ws.rs.core.Response;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.polaris.core.policy.PolicyType;
+import org.apache.polaris.service.types.ApplicablePolicy;
+import org.apache.polaris.service.types.AttachPolicyRequest;
+import org.apache.polaris.service.types.CreatePolicyRequest;
+import org.apache.polaris.service.types.DetachPolicyRequest;
+import org.apache.polaris.service.types.GetApplicablePoliciesResponse;
+import org.apache.polaris.service.types.ListPoliciesResponse;
+import org.apache.polaris.service.types.LoadPolicyResponse;
+import org.apache.polaris.service.types.Policy;
+import org.apache.polaris.service.types.PolicyAttachmentTarget;
+import org.apache.polaris.service.types.PolicyIdentifier;
+import org.apache.polaris.service.types.UpdatePolicyRequest;
+import org.assertj.core.api.Assertions;
+
+public class PolicyApi extends RestApi {
+  PolicyApi(Client client, PolarisApiEndpoints endpoints, String authToken, 
URI uri) {
+    super(client, endpoints, authToken, uri);
+  }
+
+  public void purge(String catalog, Namespace ns) {
+    listPolicies(catalog, ns).forEach(t -> dropPolicy(catalog, t));
+  }
+
+  public List<PolicyIdentifier> listPolicies(String catalog, Namespace 
namespace) {
+    return listPolicies(catalog, namespace, null);
+  }
+
+  public List<PolicyIdentifier> listPolicies(String catalog, Namespace 
namespace, PolicyType type) {
+    String ns = RESTUtil.encodeNamespace(namespace);
+    Map<String, String> queryParams = new HashMap<>();
+    if (type != null) {
+      queryParams.put("policyType", type.getName());
+    }
+    try (Response res =
+        request(
+                "polaris/v1/{cat}/namespaces/{ns}/policies",
+                Map.of("cat", catalog, "ns", ns),
+                queryParams)
+            .get()) {
+      
Assertions.assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+      return 
res.readEntity(ListPoliciesResponse.class).getIdentifiers().stream().toList();
+    }
+  }
+
+  public void dropPolicy(String catalog, PolicyIdentifier policyIdentifier) {
+    String ns = RESTUtil.encodeNamespace(policyIdentifier.getNamespace());
+    try (Response res =
+        request(
+                "polaris/v1/{cat}/namespaces/{ns}/policies/{policy}",
+                Map.of("cat", catalog, "ns", ns, "policy", 
policyIdentifier.getName()))
+            .delete()) {
+      
Assertions.assertThat(res.getStatus()).isEqualTo(Response.Status.NO_CONTENT.getStatusCode());
+    }
+  }
+
+  public Policy loadPolicy(String catalog, PolicyIdentifier policyIdentifier) {
+    String ns = RESTUtil.encodeNamespace(policyIdentifier.getNamespace());
+    try (Response res =
+        request(
+                "polaris/v1/{cat}/namespaces/{ns}/policies/{policy}",
+                Map.of("cat", catalog, "ns", ns, "policy", 
policyIdentifier.getName()))
+            .get()) {
+      
Assertions.assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+      return res.readEntity(LoadPolicyResponse.class).getPolicy();
+    }
+  }
+
+  public Policy createPolicy(
+      String catalog,
+      PolicyIdentifier policyIdentifier,
+      PolicyType policyType,
+      String content,
+      String description) {
+    String ns = RESTUtil.encodeNamespace(policyIdentifier.getNamespace());
+    CreatePolicyRequest request =
+        CreatePolicyRequest.builder()
+            .setType(policyType.getName())
+            .setName(policyIdentifier.getName())
+            .setDescription(description)
+            .setContent(content)
+            .build();
+    try (Response res =
+        request("polaris/v1/{cat}/namespaces/{ns}/policies", Map.of("cat", 
catalog, "ns", ns))
+            .post(Entity.json(request))) {
+      
Assertions.assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+      return res.readEntity(LoadPolicyResponse.class).getPolicy();
+    }
+  }
+
+  public Policy updatePolicy(
+      String catalog,
+      PolicyIdentifier policyIdentifier,
+      String newContent,
+      String newDescription,
+      int currentPolicyVersion) {
+    String ns = RESTUtil.encodeNamespace(policyIdentifier.getNamespace());
+    UpdatePolicyRequest request =
+        UpdatePolicyRequest.builder()
+            .setContent(newContent)
+            .setDescription(newDescription)
+            .setCurrentPolicyVersion(currentPolicyVersion)
+            .build();
+    try (Response res =
+        request(
+                "polaris/v1/{cat}/namespaces/{ns}/policies/{policy}",
+                Map.of("cat", catalog, "ns", ns, "policy", 
policyIdentifier.getName()))
+            .put(Entity.json(request))) {
+      
Assertions.assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+      return res.readEntity(LoadPolicyResponse.class).getPolicy();
+    }
+  }
+
+  public void attachPolicy(
+      String catalog,
+      PolicyIdentifier policyIdentifier,
+      PolicyAttachmentTarget target,
+      Map<String, String> parameters) {
+    String ns = RESTUtil.encodeNamespace(policyIdentifier.getNamespace());
+    AttachPolicyRequest request =
+        
AttachPolicyRequest.builder().setTarget(target).setParameters(parameters).build();
+    try (Response res =
+        request(
+                "polaris/v1/{cat}/namespaces/{ns}/policies/{policy}/mappings",
+                Map.of("cat", catalog, "ns", ns, "policy", 
policyIdentifier.getName()))
+            .put(Entity.json(request))) {
+      
Assertions.assertThat(res.getStatus()).isEqualTo(Response.Status.NO_CONTENT.getStatusCode());
+    }
+  }
+
+  public void detachPolicy(
+      String catalog, PolicyIdentifier policyIdentifier, 
PolicyAttachmentTarget target) {
+    String ns = RESTUtil.encodeNamespace(policyIdentifier.getNamespace());
+    DetachPolicyRequest request = 
DetachPolicyRequest.builder().setTarget(target).build();
+    try (Response res =
+        request(
+                "polaris/v1/{cat}/namespaces/{ns}/policies/{policy}/mappings",
+                Map.of("cat", catalog, "ns", ns, "policy", 
policyIdentifier.getName()))
+            .post(Entity.json(request))) {
+      
Assertions.assertThat(res.getStatus()).isEqualTo(Response.Status.NO_CONTENT.getStatusCode());
+    }
+  }
+
+  public List<ApplicablePolicy> getApplicablePolicies(
+      String catalog, Namespace namespace, String targetName, PolicyType 
policyType) {
+    String ns = namespace != null ? RESTUtil.encodeNamespace(namespace) : null;
+    Map<String, String> queryParams = new HashMap<>();
+    if (ns != null) {
+      queryParams.put("namespace", ns);
+    }
+    if (targetName != null) {
+      queryParams.put("target-name", targetName);
+    }
+    if (policyType != null) {
+      queryParams.put("policyType", policyType.getName());
+    }
+
+    try (Response res =
+        request("polaris/v1/{cat}/applicable-policies", Map.of("cat", 
catalog), queryParams)
+            .get()) {
+      
Assertions.assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+      return 
res.readEntity(GetApplicablePoliciesResponse.class).getApplicablePolicies().stream()
+          .toList();
+    }
+  }
+}
diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisPolicyServiceIntegrationTest.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisPolicyServiceIntegrationTest.java
new file mode 100644
index 000000000..055fa411c
--- /dev/null
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisPolicyServiceIntegrationTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.it.test;
+
+import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
+
+import com.google.common.collect.ImmutableMap;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.types.Types;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogGrant;
+import org.apache.polaris.core.admin.model.CatalogPrivilege;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.CatalogRole;
+import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
+import org.apache.polaris.core.admin.model.GrantResource;
+import org.apache.polaris.core.admin.model.PolarisCatalog;
+import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.policy.PredefinedPolicyTypes;
+import org.apache.polaris.service.it.env.ClientCredentials;
+import org.apache.polaris.service.it.env.IcebergHelper;
+import org.apache.polaris.service.it.env.IntegrationTestsHelper;
+import org.apache.polaris.service.it.env.ManagementApi;
+import org.apache.polaris.service.it.env.PolarisApiEndpoints;
+import org.apache.polaris.service.it.env.PolarisClient;
+import org.apache.polaris.service.it.env.PolicyApi;
+import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
+import org.apache.polaris.service.types.ApplicablePolicy;
+import org.apache.polaris.service.types.Policy;
+import org.apache.polaris.service.types.PolicyAttachmentTarget;
+import org.apache.polaris.service.types.PolicyIdentifier;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+@ExtendWith(PolarisIntegrationTestExtension.class)
+public class PolarisPolicyServiceIntegrationTest {
+
+  private static final String TEST_ROLE_ARN =
+      Optional.ofNullable(System.getenv("INTEGRATION_TEST_ROLE_ARN"))
+          .orElse("arn:aws:iam::123456789012:role/my-role");
+
+  private static final String EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT = 
"{\"enable\":true}";
+  private static final Namespace NS1 = Namespace.of("NS1");
+  private static final Namespace NS2 = Namespace.of("NS2");
+  private static final PolicyIdentifier NS1_P1 = new PolicyIdentifier(NS1, 
"P1");
+  private static final PolicyIdentifier NS1_P2 = new PolicyIdentifier(NS1, 
"P2");
+  private static final PolicyIdentifier NS1_P3 = new PolicyIdentifier(NS1, 
"P3");
+  private static final TableIdentifier NS2_T1 = TableIdentifier.of(NS2, "T1");
+
+  private static URI s3BucketBase;
+  private static String principalRoleName;
+  private static ClientCredentials adminCredentials;
+  private static PrincipalWithCredentials principalCredentials;
+  private static PolarisApiEndpoints endpoints;
+  private static PolarisClient client;
+  private static ManagementApi managementApi;
+  private static PolicyApi policyApi;
+
+  private RESTCatalog restCatalog;
+  private String currentCatalogName;
+
+  private final String catalogBaseLocation =
+      s3BucketBase + "/" + System.getenv("USER") + "/path/to/data";
+
+  private static final String[] DEFAULT_CATALOG_PROPERTIES = {
+    "allow.unstructured.table.location", "true",
+    "allow.external.table.location", "true"
+  };
+
+  @Retention(RetentionPolicy.RUNTIME)
+  private @interface CatalogConfig {
+    Catalog.TypeEnum value() default Catalog.TypeEnum.INTERNAL;
+
+    String[] properties() default {
+      "allow.unstructured.table.location", "true",
+      "allow.external.table.location", "true"
+    };
+  }
+
+  @Retention(RetentionPolicy.RUNTIME)
+  private @interface RestCatalogConfig {
+    String[] value() default {};
+  }
+
+  @BeforeAll
+  public static void setup(
+      PolarisApiEndpoints apiEndpoints, ClientCredentials credentials, 
@TempDir Path tempDir) {
+    adminCredentials = credentials;
+    endpoints = apiEndpoints;
+    client = polarisClient(endpoints);
+    managementApi = client.managementApi(credentials);
+    String principalName = client.newEntityName("snowman-rest");
+    principalRoleName = client.newEntityName("rest-admin");
+    principalCredentials = 
managementApi.createPrincipalWithRole(principalName, principalRoleName);
+    URI testRootUri = IntegrationTestsHelper.getTemporaryDirectory(tempDir);
+    s3BucketBase = testRootUri.resolve("my-bucket");
+
+    policyApi = client.policyApi(principalCredentials);
+  }
+
+  @AfterAll
+  public static void close() throws Exception {
+    client.close();
+  }
+
+  @BeforeEach
+  public void before(TestInfo testInfo) {
+    String principalName = "snowman-rest-" + UUID.randomUUID();
+    principalRoleName = "rest-admin-" + UUID.randomUUID();
+    PrincipalWithCredentials principalCredentials =
+        managementApi.createPrincipalWithRole(principalName, 
principalRoleName);
+
+    Method method = testInfo.getTestMethod().orElseThrow();
+    currentCatalogName = client.newEntityName(method.getName());
+    AwsStorageConfigInfo awsConfigModel =
+        AwsStorageConfigInfo.builder()
+            .setRoleArn(TEST_ROLE_ARN)
+            .setExternalId("externalId")
+            .setUserArn("a:user:arn")
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+            .setAllowedLocations(List.of("s3://my-old-bucket/path/to/data"))
+            .build();
+    Optional<PolarisPolicyServiceIntegrationTest.CatalogConfig> catalogConfig =
+        Optional.ofNullable(
+            
method.getAnnotation(PolarisPolicyServiceIntegrationTest.CatalogConfig.class));
+
+    CatalogProperties.Builder catalogPropsBuilder = 
CatalogProperties.builder(catalogBaseLocation);
+    String[] properties =
+        catalogConfig
+            .map(PolarisPolicyServiceIntegrationTest.CatalogConfig::properties)
+            .orElse(DEFAULT_CATALOG_PROPERTIES);
+    for (int i = 0; i < properties.length; i += 2) {
+      catalogPropsBuilder.addProperty(properties[i], properties[i + 1]);
+    }
+    if (!s3BucketBase.getScheme().equals("file")) {
+      catalogPropsBuilder.addProperty(
+          CatalogEntity.REPLACE_NEW_LOCATION_PREFIX_WITH_CATALOG_DEFAULT_KEY, 
"file:");
+    }
+    Catalog catalog =
+        PolarisCatalog.builder()
+            .setType(
+                catalogConfig
+                    
.map(PolarisPolicyServiceIntegrationTest.CatalogConfig::value)
+                    .orElse(Catalog.TypeEnum.INTERNAL))
+            .setName(currentCatalogName)
+            .setProperties(catalogPropsBuilder.build())
+            .setStorageConfigInfo(
+                s3BucketBase.getScheme().equals("file")
+                    ? new FileStorageConfigInfo(
+                        StorageConfigInfo.StorageTypeEnum.FILE, 
List.of("file://"))
+                    : awsConfigModel)
+            .build();
+
+    managementApi.createCatalog(principalRoleName, catalog);
+
+    Optional<PolarisPolicyServiceIntegrationTest.RestCatalogConfig> 
restCatalogConfig =
+        testInfo
+            .getTestMethod()
+            .flatMap(
+                m ->
+                    Optional.ofNullable(
+                        m.getAnnotation(
+                            
PolarisPolicyServiceIntegrationTest.RestCatalogConfig.class)));
+    ImmutableMap.Builder<String, String> extraPropertiesBuilder = 
ImmutableMap.builder();
+    restCatalogConfig.ifPresent(
+        config -> {
+          for (int i = 0; i < config.value().length; i += 2) {
+            extraPropertiesBuilder.put(config.value()[i], config.value()[i + 
1]);
+          }
+        });
+
+    restCatalog =
+        IcebergHelper.restCatalog(
+            client,
+            endpoints,
+            principalCredentials,
+            currentCatalogName,
+            extraPropertiesBuilder.build());
+    CatalogGrant catalogGrant =
+        new CatalogGrant(CatalogPrivilege.CATALOG_MANAGE_CONTENT, 
GrantResource.TypeEnum.CATALOG);
+    managementApi.createCatalogRole(currentCatalogName, "catalogrole1");
+    managementApi.addGrant(currentCatalogName, "catalogrole1", catalogGrant);
+    CatalogRole catalogRole = managementApi.getCatalogRole(currentCatalogName, 
"catalogrole1");
+    managementApi.grantCatalogRoleToPrincipalRole(
+        principalRoleName, currentCatalogName, catalogRole);
+
+    policyApi = client.policyApi(principalCredentials);
+  }
+
+  @AfterEach
+  public void cleanUp() {
+    client.cleanUp(adminCredentials);
+  }
+
+  @Test
+  public void testCreatePolicy() {
+    restCatalog.createNamespace(NS1);
+    Policy policy =
+        policyApi.createPolicy(
+            currentCatalogName,
+            NS1_P1,
+            PredefinedPolicyTypes.DATA_COMPACTION,
+            EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT,
+            "test policy");
+
+    Assertions.assertThat(policy).isNotNull();
+    Assertions.assertThat(policy.getName()).isEqualTo("P1");
+    Assertions.assertThat(policy.getDescription()).isEqualTo("test policy");
+    Assertions.assertThat(policy.getPolicyType())
+        .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
+    
Assertions.assertThat(policy.getContent()).isEqualTo(EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT);
+    Assertions.assertThat(policy.getInheritable())
+        .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.isInheritable());
+    Assertions.assertThat(policy.getVersion()).isEqualTo(0);
+
+    Policy loadedPolicy = policyApi.loadPolicy(currentCatalogName, NS1_P1);
+    Assertions.assertThat(loadedPolicy).isEqualTo(policy);
+
+    policyApi.dropPolicy(currentCatalogName, NS1_P1);
+  }
+
+  @Test
+  public void testDropPolicy() {
+    restCatalog.createNamespace(NS1);
+    policyApi.createPolicy(
+        currentCatalogName,
+        NS1_P1,
+        PredefinedPolicyTypes.DATA_COMPACTION,
+        EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT,
+        "test policy");
+    policyApi.dropPolicy(currentCatalogName, NS1_P1);
+    Assertions.assertThat(policyApi.listPolicies(currentCatalogName, 
NS1)).hasSize(0);
+  }
+
+  @Test
+  public void testUpdatePolicy() {
+    restCatalog.createNamespace(NS1);
+    policyApi.createPolicy(
+        currentCatalogName,
+        NS1_P1,
+        PredefinedPolicyTypes.DATA_COMPACTION,
+        EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT,
+        "test policy");
+
+    String updatedContent = "{\"enable\":false}";
+    String updatedDescription = "updated test policy";
+    Policy updatedPolicy =
+        policyApi.updatePolicy(currentCatalogName, NS1_P1, updatedContent, 
updatedDescription, 0);
+
+    Assertions.assertThat(updatedPolicy).isNotNull();
+    Assertions.assertThat(updatedPolicy.getName()).isEqualTo("P1");
+    
Assertions.assertThat(updatedPolicy.getDescription()).isEqualTo(updatedDescription);
+    Assertions.assertThat(updatedPolicy.getPolicyType())
+        .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
+    
Assertions.assertThat(updatedPolicy.getContent()).isEqualTo(updatedContent);
+    Assertions.assertThat(updatedPolicy.getInheritable())
+        .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.isInheritable());
+    Assertions.assertThat(updatedPolicy.getVersion()).isEqualTo(1);
+
+    policyApi.dropPolicy(currentCatalogName, NS1_P1);
+  }
+
+  @Test
+  public void testListPolicies() {
+    restCatalog.createNamespace(NS1);
+    policyApi.createPolicy(
+        currentCatalogName,
+        NS1_P1,
+        PredefinedPolicyTypes.DATA_COMPACTION,
+        EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT,
+        "test policy");
+    policyApi.createPolicy(
+        currentCatalogName,
+        NS1_P2,
+        PredefinedPolicyTypes.METADATA_COMPACTION,
+        EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT,
+        "test policy");
+
+    Assertions.assertThat(policyApi.listPolicies(currentCatalogName, NS1))
+        .containsExactlyInAnyOrder(NS1_P1, NS1_P2);
+    Assertions.assertThat(
+            policyApi.listPolicies(currentCatalogName, NS1, 
PredefinedPolicyTypes.DATA_COMPACTION))
+        .containsExactly(NS1_P1);
+    Assertions.assertThat(
+            policyApi.listPolicies(
+                currentCatalogName, NS1, 
PredefinedPolicyTypes.METADATA_COMPACTION))
+        .containsExactly(NS1_P2);
+
+    policyApi.dropPolicy(currentCatalogName, NS1_P1);
+    policyApi.dropPolicy(currentCatalogName, NS1_P2);
+  }
+
+  @Test
+  public void testPolicyMapping() {
+    restCatalog.createNamespace(NS1);
+    restCatalog.createNamespace(NS2);
+    Policy p1 =
+        policyApi.createPolicy(
+            currentCatalogName,
+            NS1_P1,
+            PredefinedPolicyTypes.DATA_COMPACTION,
+            EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT,
+            "test policy");
+    Policy p2 =
+        policyApi.createPolicy(
+            currentCatalogName,
+            NS1_P2,
+            PredefinedPolicyTypes.METADATA_COMPACTION,
+            EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT,
+            "test policy");
+    Policy p3 =
+        policyApi.createPolicy(
+            currentCatalogName,
+            NS1_P3,
+            PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL,
+            EXAMPLE_TABLE_MAINTENANCE_POLICY_CONTENT,
+            "test policy");
+
+    restCatalog
+        .buildTable(
+            NS2_T1, new Schema(Types.NestedField.of(1, true, "string", 
Types.StringType.get())))
+        .create();
+
+    PolicyAttachmentTarget catalogTarget =
+        
PolicyAttachmentTarget.builder().setType(PolicyAttachmentTarget.TypeEnum.CATALOG).build();
+    PolicyAttachmentTarget namespaceTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.NAMESPACE)
+            .setPath(Arrays.asList(NS2.levels()))
+            .build();
+    PolicyAttachmentTarget tableTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.TABLE_LIKE)
+            .setPath(PolarisCatalogHelpers.tableIdentifierToList(NS2_T1))
+            .build();
+
+    policyApi.attachPolicy(currentCatalogName, NS1_P1, catalogTarget, 
Map.of());
+    policyApi.attachPolicy(currentCatalogName, NS1_P2, namespaceTarget, 
Map.of());
+    policyApi.attachPolicy(currentCatalogName, NS1_P3, tableTarget, Map.of());
+
+    List<ApplicablePolicy> applicablePoliciesOnCatalog =
+        policyApi.getApplicablePolicies(currentCatalogName, null, null, null);
+    Assertions.assertThat(applicablePoliciesOnCatalog)
+        .containsExactly(policyToApplicablePolicy(p1, false, NS1));
+
+    List<ApplicablePolicy> applicablePoliciesOnNamespace =
+        policyApi.getApplicablePolicies(currentCatalogName, NS2, null, null);
+    Assertions.assertThat(applicablePoliciesOnNamespace)
+        .containsExactlyInAnyOrder(
+            policyToApplicablePolicy(p1, true, NS1), 
policyToApplicablePolicy(p2, false, NS1));
+
+    List<ApplicablePolicy> applicablePoliciesOnTable =
+        policyApi.getApplicablePolicies(currentCatalogName, NS2, 
NS2_T1.name(), null);
+    Assertions.assertThat(applicablePoliciesOnTable)
+        .containsExactlyInAnyOrder(
+            policyToApplicablePolicy(p1, true, NS1),
+            policyToApplicablePolicy(p2, true, NS1),
+            policyToApplicablePolicy(p3, false, NS1));
+
+    Assertions.assertThat(
+            policyApi.getApplicablePolicies(
+                currentCatalogName, NS2, NS2_T1.name(), 
PredefinedPolicyTypes.METADATA_COMPACTION))
+        .containsExactlyInAnyOrder(policyToApplicablePolicy(p2, true, NS1));
+
+    policyApi.detachPolicy(currentCatalogName, NS1_P1, catalogTarget);
+    policyApi.detachPolicy(currentCatalogName, NS1_P2, namespaceTarget);
+    policyApi.detachPolicy(currentCatalogName, NS1_P3, tableTarget);
+
+    policyApi.dropPolicy(currentCatalogName, NS1_P1);
+    policyApi.dropPolicy(currentCatalogName, NS1_P2);
+    policyApi.dropPolicy(currentCatalogName, NS1_P3);
+
+    restCatalog.dropTable(NS2_T1);
+  }
+
+  private static ApplicablePolicy policyToApplicablePolicy(
+      Policy policy, boolean inherited, Namespace parent) {
+    return new ApplicablePolicy(
+        policy.getPolicyType(),
+        policy.getInheritable(),
+        policy.getName(),
+        policy.getDescription(),
+        policy.getContent(),
+        policy.getVersion(),
+        inherited,
+        Arrays.asList(parent.levels()));
+  }
+}
diff --git a/plugins/spark/v3.5/regtests/spark_sql.ref 
b/plugins/spark/v3.5/regtests/spark_sql.ref
index 5825d0931..525a705ae 100755
--- a/plugins/spark/v3.5/regtests/spark_sql.ref
+++ b/plugins/spark/v3.5/regtests/spark_sql.ref
@@ -1,4 +1,4 @@
-{"defaults":{"default-base-location":"file:///tmp/spark_catalog"},"overrides":{"prefix":"spark_sql_catalog"},"endpoints":["GET
 /v1/{prefix}/namespaces","GET /v1/{prefix}/namespaces/{namespace}","HEAD 
/v1/{prefix}/namespaces/{namespace}","POST /v1/{prefix}/namespaces","POST 
/v1/{prefix}/namespaces/{namespace}/properties","DELETE 
/v1/{prefix}/namespaces/{namespace}","GET 
/v1/{prefix}/namespaces/{namespace}/tables","GET 
/v1/{prefix}/namespaces/{namespace}/tables/{table}","HEAD /v1/{prefix}/ [...]
+{"defaults":{"default-base-location":"file:///tmp/spark_catalog"},"overrides":{"prefix":"spark_sql_catalog"},"endpoints":["GET
 /v1/{prefix}/namespaces","GET /v1/{prefix}/namespaces/{namespace}","HEAD 
/v1/{prefix}/namespaces/{namespace}","POST /v1/{prefix}/namespaces","POST 
/v1/{prefix}/namespaces/{namespace}/properties","DELETE 
/v1/{prefix}/namespaces/{namespace}","GET 
/v1/{prefix}/namespaces/{namespace}/tables","GET 
/v1/{prefix}/namespaces/{namespace}/tables/{table}","HEAD /v1/{prefix}/ [...]
 Catalog created
 spark-sql (default)> use polaris;
 spark-sql ()> create namespace db1;
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index f857d03ac..ee663dbb1 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -227,4 +227,11 @@ public class FeatureConfiguration<T> extends 
PolarisConfiguration<T> {
                   + " to perform federation to remote catalogs.")
           .defaultValue(false)
           .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Boolean> ENABLE_POLICY_STORE =
+      PolarisConfiguration.<Boolean>builder()
+          .key("ENABLE_POLICY_STORE")
+          .description("If true, the policy-store endpoints are enabled")
+          .defaultValue(true)
+          .buildFeatureConfiguration();
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisEndpoints.java 
b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisEndpoints.java
index edbfb1414..1fd91395b 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisEndpoints.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisEndpoints.java
@@ -25,6 +25,7 @@ import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.context.CallContext;
 
 public class PolarisEndpoints {
+  // Generic table endpoints
   public static final Endpoint V1_LIST_GENERIC_TABLES =
       Endpoint.create("GET", PolarisResourcePaths.V1_GENERIC_TABLES);
   public static final Endpoint V1_LOAD_GENERIC_TABLE =
@@ -42,6 +43,36 @@ public class PolarisEndpoints {
           .add(V1_LOAD_GENERIC_TABLE)
           .build();
 
+  // Policy store endpoints
+  public static final Endpoint V1_LIST_POLICIES =
+      Endpoint.create("GET", PolarisResourcePaths.V1_POLICIES);
+  public static final Endpoint V1_CREATE_POLICY =
+      Endpoint.create("POST", PolarisResourcePaths.V1_POLICIES);
+  public static final Endpoint V1_LOAD_POLICY =
+      Endpoint.create("GET", PolarisResourcePaths.V1_POLICY);
+  public static final Endpoint V1_UPDATE_POLICY =
+      Endpoint.create("PUT", PolarisResourcePaths.V1_POLICY);
+  public static final Endpoint V1_DROP_POLICY =
+      Endpoint.create("DELETE", PolarisResourcePaths.V1_POLICY);
+  public static final Endpoint V1_ATTACH_POLICY =
+      Endpoint.create("PUT", PolarisResourcePaths.V1_POLICY_MAPPINGS);
+  public static final Endpoint V1_DETACH_POLICY =
+      Endpoint.create("POST", PolarisResourcePaths.V1_POLICY_MAPPINGS);
+  public static final Endpoint V1_GET_APPLICABLE_POLICIES =
+      Endpoint.create("GET", PolarisResourcePaths.V1_APPLICABLE_POLICIES);
+
+  public static final Set<Endpoint> POLICY_STORE_ENDPOINTS =
+      ImmutableSet.<Endpoint>builder()
+          .add(V1_LIST_POLICIES)
+          .add(V1_CREATE_POLICY)
+          .add(V1_LOAD_POLICY)
+          .add(V1_UPDATE_POLICY)
+          .add(V1_DROP_POLICY)
+          .add(V1_ATTACH_POLICY)
+          .add(V1_DETACH_POLICY)
+          .add(V1_GET_APPLICABLE_POLICIES)
+          .build();
+
   /**
    * Get the generic table endpoints. Returns GENERIC_TABLE_ENDPOINTS if 
ENABLE_GENERIC_TABLES is
    * set to true, otherwise, returns an empty set.
@@ -57,4 +88,18 @@ public class PolarisEndpoints {
 
     return genericTableEnabled ? GENERIC_TABLE_ENDPOINTS : ImmutableSet.of();
   }
+
+  /**
+   * Get the policy store endpoints. Returns POLICY_ENDPOINTS if 
ENABLE_POLICY_STORE is set to true,
+   * otherwise, returns an empty set
+   */
+  public static Set<Endpoint> getSupportedPolicyEndpoints(CallContext 
callContext) {
+    boolean policyStoreEnabled =
+        callContext
+            .getPolarisCallContext()
+            .getConfigurationStore()
+            .getConfiguration(
+                callContext.getPolarisCallContext(), 
FeatureConfiguration.ENABLE_POLICY_STORE);
+    return policyStoreEnabled ? POLICY_STORE_ENDPOINTS : ImmutableSet.of();
+  }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java
index 159a1a014..8a30d7962 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/rest/PolarisResourcePaths.java
@@ -34,6 +34,14 @@ public class PolarisResourcePaths {
   public static final String V1_GENERIC_TABLE =
       
"polaris/v1/{prefix}/namespaces/{namespace}/generic-tables/{generic-table}";
 
+  // Policy Store endpoints
+  public static final String V1_POLICIES = 
"/polaris/v1/{prefix}/namespaces/{namespace}/policies";
+  public static final String V1_POLICY =
+      "/polaris/v1/{prefix}/namespaces/{namespace}/policies/{policy-name}";
+  public static final String V1_POLICY_MAPPINGS =
+      
"/polaris/v1/{prefix}/namespaces/{namespace}/policies/{policy-name}/mappings";
+  public static final String V1_APPLICABLE_POLICIES = 
"/polaris/v1/{prefix}/applicable-policies";
+
   private final String prefix;
 
   public PolarisResourcePaths(String prefix) {
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyServiceImpl.java
 
b/quarkus/service/src/intTest/java/org/apache/polaris/service/quarkus/it/QuarkusPolicyServiceIT.java
similarity index 73%
copy from 
service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyServiceImpl.java
copy to 
quarkus/service/src/intTest/java/org/apache/polaris/service/quarkus/it/QuarkusPolicyServiceIT.java
index 55bb18000..68280a672 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyServiceImpl.java
+++ 
b/quarkus/service/src/intTest/java/org/apache/polaris/service/quarkus/it/QuarkusPolicyServiceIT.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.service.catalog.policy;
+package org.apache.polaris.service.quarkus.it;
 
-import jakarta.enterprise.context.RequestScoped;
-import org.apache.polaris.service.catalog.api.PolarisCatalogPolicyApiService;
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import org.apache.polaris.service.it.test.PolarisPolicyServiceIntegrationTest;
 
-@RequestScoped
-public class PolicyServiceImpl implements PolarisCatalogPolicyApiService {}
+@QuarkusIntegrationTest
+public class QuarkusPolicyServiceIT extends 
PolarisPolicyServiceIntegrationTest {}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyServiceImpl.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/it/QuarkusPolicyServiceIntegrationTest.java
similarity index 74%
rename from 
service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyServiceImpl.java
rename to 
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/it/QuarkusPolicyServiceIntegrationTest.java
index 55bb18000..a668e92e5 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyServiceImpl.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/it/QuarkusPolicyServiceIntegrationTest.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.service.catalog.policy;
+package org.apache.polaris.service.quarkus.it;
 
-import jakarta.enterprise.context.RequestScoped;
-import org.apache.polaris.service.catalog.api.PolarisCatalogPolicyApiService;
+import io.quarkus.test.junit.QuarkusTest;
+import org.apache.polaris.service.it.test.PolarisPolicyServiceIntegrationTest;
 
-@RequestScoped
-public class PolicyServiceImpl implements PolarisCatalogPolicyApiService {}
+@QuarkusTest
+public class QuarkusPolicyServiceIntegrationTest extends 
PolarisPolicyServiceIntegrationTest {}
diff --git a/regtests/t_spark_sql/ref/spark_sql_basic.sh.ref 
b/regtests/t_spark_sql/ref/spark_sql_basic.sh.ref
index eaf0e18a8..79b317fcf 100755
--- a/regtests/t_spark_sql/ref/spark_sql_basic.sh.ref
+++ b/regtests/t_spark_sql/ref/spark_sql_basic.sh.ref
@@ -1,4 +1,4 @@
-{"defaults":{"default-base-location":"file:///tmp/spark_sql_s3_catalog"},"overrides":{"prefix":"spark_sql_basic_catalog"},"endpoints":["GET
 /v1/{prefix}/namespaces","GET /v1/{prefix}/namespaces/{namespace}","HEAD 
/v1/{prefix}/namespaces/{namespace}","POST /v1/{prefix}/namespaces","POST 
/v1/{prefix}/namespaces/{namespace}/properties","DELETE 
/v1/{prefix}/namespaces/{namespace}","GET 
/v1/{prefix}/namespaces/{namespace}/tables","GET 
/v1/{prefix}/namespaces/{namespace}/tables/{table}","HEAD  [...]
+{"defaults":{"default-base-location":"file:///tmp/spark_sql_s3_catalog"},"overrides":{"prefix":"spark_sql_basic_catalog"},"endpoints":["GET
 /v1/{prefix}/namespaces","GET /v1/{prefix}/namespaces/{namespace}","HEAD 
/v1/{prefix}/namespaces/{namespace}","POST /v1/{prefix}/namespaces","POST 
/v1/{prefix}/namespaces/{namespace}/properties","DELETE 
/v1/{prefix}/namespaces/{namespace}","GET 
/v1/{prefix}/namespaces/{namespace}/tables","GET 
/v1/{prefix}/namespaces/{namespace}/tables/{table}","HEAD  [...]
 Catalog created
 spark-sql (default)> use polaris;
 spark-sql ()> show namespaces;
diff --git a/regtests/t_spark_sql/ref/spark_sql_views.sh.ref 
b/regtests/t_spark_sql/ref/spark_sql_views.sh.ref
index ffae79311..9bb78d644 100755
--- a/regtests/t_spark_sql/ref/spark_sql_views.sh.ref
+++ b/regtests/t_spark_sql/ref/spark_sql_views.sh.ref
@@ -1,4 +1,4 @@
-{"defaults":{"default-base-location":"file:///tmp/spark_sql_s3_catalog"},"overrides":{"prefix":"spark_sql_views_catalog"},"endpoints":["GET
 /v1/{prefix}/namespaces","GET /v1/{prefix}/namespaces/{namespace}","HEAD 
/v1/{prefix}/namespaces/{namespace}","POST /v1/{prefix}/namespaces","POST 
/v1/{prefix}/namespaces/{namespace}/properties","DELETE 
/v1/{prefix}/namespaces/{namespace}","GET 
/v1/{prefix}/namespaces/{namespace}/tables","GET 
/v1/{prefix}/namespaces/{namespace}/tables/{table}","HEAD  [...]
+{"defaults":{"default-base-location":"file:///tmp/spark_sql_s3_catalog"},"overrides":{"prefix":"spark_sql_views_catalog"},"endpoints":["GET
 /v1/{prefix}/namespaces","GET /v1/{prefix}/namespaces/{namespace}","HEAD 
/v1/{prefix}/namespaces/{namespace}","POST /v1/{prefix}/namespaces","POST 
/v1/{prefix}/namespaces/{namespace}/properties","DELETE 
/v1/{prefix}/namespaces/{namespace}","GET 
/v1/{prefix}/namespaces/{namespace}/tables","GET 
/v1/{prefix}/namespaces/{namespace}/tables/{table}","HEAD  [...]
 Catalog created
 spark-sql (default)> use polaris;
 spark-sql ()> show namespaces;
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
index 689535139..e2dcefc0b 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogAdapter.java
@@ -724,6 +724,7 @@ public class IcebergCatalogAdapter
                         .addAll(VIEW_ENDPOINTS)
                         .addAll(COMMIT_ENDPOINT)
                         
.addAll(PolarisEndpoints.getSupportedGenericTableEndpoints(callContext))
+                        
.addAll(PolarisEndpoints.getSupportedPolicyEndpoints(callContext))
                         .build())
                 .build())
         .build();
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogAdapter.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogAdapter.java
new file mode 100644
index 000000000..fe70d00a8
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogAdapter.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.policy;
+
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.Response;
+import jakarta.ws.rs.core.SecurityContext;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.NotAuthorizedException;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.policy.PolicyType;
+import org.apache.polaris.service.catalog.CatalogPrefixParser;
+import org.apache.polaris.service.catalog.api.PolarisCatalogPolicyApiService;
+import org.apache.polaris.service.catalog.common.CatalogAdapter;
+import org.apache.polaris.service.types.AttachPolicyRequest;
+import org.apache.polaris.service.types.CreatePolicyRequest;
+import org.apache.polaris.service.types.DetachPolicyRequest;
+import org.apache.polaris.service.types.GetApplicablePoliciesResponse;
+import org.apache.polaris.service.types.ListPoliciesResponse;
+import org.apache.polaris.service.types.LoadPolicyResponse;
+import org.apache.polaris.service.types.PolicyIdentifier;
+import org.apache.polaris.service.types.UpdatePolicyRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RequestScoped
+public class PolicyCatalogAdapter implements PolarisCatalogPolicyApiService, 
CatalogAdapter {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PolicyCatalogAdapter.class);
+
+  private final RealmContext realmContext;
+  private final CallContext callContext;
+  private final PolarisEntityManager entityManager;
+  private final PolarisMetaStoreManager metaStoreManager;
+  private final PolarisAuthorizer polarisAuthorizer;
+  private final CatalogPrefixParser prefixParser;
+
+  @Inject
+  public PolicyCatalogAdapter(
+      RealmContext realmContext,
+      CallContext callContext,
+      PolarisEntityManager entityManager,
+      PolarisMetaStoreManager metaStoreManager,
+      PolarisAuthorizer polarisAuthorizer,
+      CatalogPrefixParser prefixParser) {
+    this.realmContext = realmContext;
+    this.callContext = callContext;
+    this.entityManager = entityManager;
+    this.metaStoreManager = metaStoreManager;
+    this.polarisAuthorizer = polarisAuthorizer;
+    this.prefixParser = prefixParser;
+  }
+
+  private PolicyCatalogHandler newHandlerWrapper(SecurityContext 
securityContext, String prefix) {
+    var authenticatedPrincipal = (AuthenticatedPolarisPrincipal) 
securityContext.getUserPrincipal();
+    if (authenticatedPrincipal == null) {
+      throw new NotAuthorizedException("Failed to find authenticatedPrincipal 
in SecurityContext");
+    }
+
+    return new PolicyCatalogHandler(
+        callContext,
+        entityManager,
+        metaStoreManager,
+        securityContext,
+        prefixParser.prefixToCatalogName(realmContext, prefix),
+        polarisAuthorizer);
+  }
+
+  @Override
+  public Response createPolicy(
+      String prefix,
+      String namespace,
+      CreatePolicyRequest createPolicyRequest,
+      RealmContext realmContext,
+      SecurityContext securityContext) {
+    Namespace ns = decodeNamespace(namespace);
+    PolicyCatalogHandler handler = newHandlerWrapper(securityContext, prefix);
+    LoadPolicyResponse response = handler.createPolicy(ns, 
createPolicyRequest);
+    return Response.ok(response).build();
+  }
+
+  @Override
+  public Response listPolicies(
+      String prefix,
+      String namespace,
+      String pageToken,
+      Integer pageSize,
+      String policyType,
+      RealmContext realmContext,
+      SecurityContext securityContext) {
+    Namespace ns = decodeNamespace(namespace);
+    PolicyType type =
+        policyType != null ? 
PolicyType.fromName(RESTUtil.decodeString(policyType)) : null;
+    PolicyCatalogHandler handler = newHandlerWrapper(securityContext, prefix);
+    ListPoliciesResponse response = handler.listPolicies(ns, type);
+    return Response.ok(response).build();
+  }
+
+  @Override
+  public Response loadPolicy(
+      String prefix,
+      String namespace,
+      String policyName,
+      RealmContext realmContext,
+      SecurityContext securityContext) {
+    Namespace ns = decodeNamespace(namespace);
+    PolicyIdentifier identifier = new PolicyIdentifier(ns, 
RESTUtil.decodeString(policyName));
+    PolicyCatalogHandler handler = newHandlerWrapper(securityContext, prefix);
+    LoadPolicyResponse response = handler.loadPolicy(identifier);
+    return Response.ok(response).build();
+  }
+
+  @Override
+  public Response updatePolicy(
+      String prefix,
+      String namespace,
+      String policyName,
+      UpdatePolicyRequest updatePolicyRequest,
+      RealmContext realmContext,
+      SecurityContext securityContext) {
+    Namespace ns = decodeNamespace(namespace);
+    PolicyIdentifier identifier = new PolicyIdentifier(ns, 
RESTUtil.decodeString(policyName));
+    PolicyCatalogHandler handler = newHandlerWrapper(securityContext, prefix);
+    LoadPolicyResponse response = handler.updatePolicy(identifier, 
updatePolicyRequest);
+    return Response.ok(response).build();
+  }
+
+  @Override
+  public Response dropPolicy(
+      String prefix,
+      String namespace,
+      String policyName,
+      Boolean detachAll,
+      RealmContext realmContext,
+      SecurityContext securityContext) {
+    Namespace ns = decodeNamespace(namespace);
+    PolicyIdentifier identifier = new PolicyIdentifier(ns, 
RESTUtil.decodeString(policyName));
+    PolicyCatalogHandler handler = newHandlerWrapper(securityContext, prefix);
+    handler.dropPolicy(identifier, detachAll != null && detachAll);
+    return Response.noContent().build();
+  }
+
+  @Override
+  public Response attachPolicy(
+      String prefix,
+      String namespace,
+      String policyName,
+      AttachPolicyRequest attachPolicyRequest,
+      RealmContext realmContext,
+      SecurityContext securityContext) {
+    Namespace ns = decodeNamespace(namespace);
+    PolicyIdentifier identifier = new PolicyIdentifier(ns, 
RESTUtil.decodeString(policyName));
+    PolicyCatalogHandler handler = newHandlerWrapper(securityContext, prefix);
+    handler.attachPolicy(identifier, attachPolicyRequest);
+    return Response.noContent().build();
+  }
+
+  @Override
+  public Response detachPolicy(
+      String prefix,
+      String namespace,
+      String policyName,
+      DetachPolicyRequest detachPolicyRequest,
+      RealmContext realmContext,
+      SecurityContext securityContext) {
+    Namespace ns = decodeNamespace(namespace);
+    PolicyIdentifier identifier = new PolicyIdentifier(ns, 
RESTUtil.decodeString(policyName));
+    PolicyCatalogHandler handler = newHandlerWrapper(securityContext, prefix);
+    handler.detachPolicy(identifier, detachPolicyRequest);
+    return Response.noContent().build();
+  }
+
+  @Override
+  public Response getApplicablePolicies(
+      String prefix,
+      String pageToken,
+      Integer pageSize,
+      String namespace,
+      String targetName,
+      String policyType,
+      RealmContext realmContext,
+      SecurityContext securityContext) {
+    Namespace ns = namespace != null ? decodeNamespace(namespace) : null;
+    String target = targetName != null ? RESTUtil.decodeString(targetName) : 
null;
+    PolicyType type =
+        policyType != null ? 
PolicyType.fromName(RESTUtil.decodeString(policyType)) : null;
+    PolicyCatalogHandler handler = newHandlerWrapper(securityContext, prefix);
+    GetApplicablePoliciesResponse response = handler.getApplicablePolicies(ns, 
target, type);
+    return Response.ok(response).build();
+  }
+}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
index f4dea27b4..8273256ba 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
@@ -32,6 +32,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
 import org.apache.polaris.core.auth.PolarisAuthorizer;
 import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
+import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
@@ -72,6 +73,8 @@ public class PolicyCatalogHandler extends CatalogHandler {
 
   @Override
   protected void initializeCatalog() {
+    FeatureConfiguration.enforceFeatureEnabledOrThrow(
+        callContext, FeatureConfiguration.ENABLE_POLICY_STORE);
     this.policyCatalog = new PolicyCatalog(metaStoreManager, callContext, 
this.resolutionManifest);
   }
 

Reply via email to