This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new f52cfd54fa [#7360] feat(policy): support CURD ops for policy in
backend storage (part-3) (#7560)
f52cfd54fa is described below
commit f52cfd54faa123c5e53a6904152464a1eaf2e09b
Author: mchades <[email protected]>
AuthorDate: Tue Jul 15 11:47:21 2025 +0800
[#7360] feat(policy): support CURD ops for policy in backend storage
(part-3) (#7560)
### What changes were proposed in this pull request?
support CURD ops for policy in backend storage (part-3)
### Why are the changes needed?
Fix: #7360
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
tests added
---
.../gravitino/storage/relational/JDBCBackend.java | 22 +-
.../relational/mapper/PolicyMetaMapper.java | 114 +++++++
.../mapper/PolicyMetaSQLProviderFactory.java | 91 ++++++
.../relational/mapper/PolicyVersionMapper.java | 71 +++++
.../mapper/PolicyVersionSQLProviderFactory.java | 90 ++++++
.../provider/base/PolicyMetaBaseSQLProvider.java | 158 +++++++++
.../base/PolicyVersionBaseSQLProvider.java | 113 +++++++
.../postgresql/PolicyMetaPostgreSQLProvider.java | 87 +++++
.../PolicyVersionPostgreSQLProvider.java | 94 ++++++
.../storage/relational/po/PolicyMaxVersionPO.java | 40 +++
.../relational/service/MetalakeMetaService.java | 10 +
.../relational/service/PolicyMetaService.java | 242 ++++++++++++++
.../storage/relational/utils/POConverters.java | 114 +++++++
.../apache/gravitino/utils/NameIdentifierUtil.java | 11 +
.../storage/relational/TestJDBCBackend.java | 175 ++++++++++
.../relational/service/TestPolicyMetaService.java | 355 +++++++++++++++++++++
16 files changed, 1783 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 966265c79f..0911a3a5df 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -44,6 +44,7 @@ import org.apache.gravitino.meta.FilesetEntity;
import org.apache.gravitino.meta.GroupEntity;
import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.meta.PolicyEntity;
import org.apache.gravitino.meta.RoleEntity;
import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.meta.TableEntity;
@@ -59,6 +60,7 @@ import
org.apache.gravitino.storage.relational.service.MetalakeMetaService;
import org.apache.gravitino.storage.relational.service.ModelMetaService;
import org.apache.gravitino.storage.relational.service.ModelVersionMetaService;
import org.apache.gravitino.storage.relational.service.OwnerMetaService;
+import org.apache.gravitino.storage.relational.service.PolicyMetaService;
import org.apache.gravitino.storage.relational.service.RoleMetaService;
import org.apache.gravitino.storage.relational.service.SchemaMetaService;
import org.apache.gravitino.storage.relational.service.TableColumnMetaService;
@@ -124,6 +126,8 @@ public class JDBCBackend implements RelationalBackend {
case MODEL_VERSION:
return (List<E>)
ModelVersionMetaService.getInstance().listModelVersionsByNamespace(namespace);
+ case POLICY:
+ return (List<E>)
PolicyMetaService.getInstance().listPoliciesByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
@@ -172,6 +176,8 @@ public class JDBCBackend implements RelationalBackend {
+ "inserting the new model version.");
}
ModelVersionMetaService.getInstance().insertModelVersion((ModelVersionEntity)
e);
+ } else if (e instanceof PolicyEntity) {
+ PolicyMetaService.getInstance().insertPolicy((PolicyEntity) e,
overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
@@ -207,6 +213,8 @@ public class JDBCBackend implements RelationalBackend {
return (E) ModelMetaService.getInstance().updateModel(ident, updater);
case MODEL_VERSION:
return (E)
ModelVersionMetaService.getInstance().updateModelVersion(ident, updater);
+ case POLICY:
+ return (E) PolicyMetaService.getInstance().updatePolicy(ident,
updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
@@ -242,6 +250,8 @@ public class JDBCBackend implements RelationalBackend {
return (E) ModelMetaService.getInstance().getModelByIdentifier(ident);
case MODEL_VERSION:
return (E)
ModelVersionMetaService.getInstance().getModelVersionByIdentifier(ident);
+ case POLICY:
+ return (E)
PolicyMetaService.getInstance().getPolicyByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
@@ -276,6 +286,8 @@ public class JDBCBackend implements RelationalBackend {
return ModelMetaService.getInstance().deleteModel(ident);
case MODEL_VERSION:
return ModelVersionMetaService.getInstance().deleteModelVersion(ident);
+ case POLICY:
+ return PolicyMetaService.getInstance().deletePolicy(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
@@ -327,8 +339,9 @@ public class JDBCBackend implements RelationalBackend {
.deleteTagMetasByLegacyTimeline(
legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case POLICY:
- // todo: Implement hard delete logic for policies.
- return 0;
+ return PolicyMetaService.getInstance()
+ .deletePolicyAndVersionMetasByLegacyTimeline(
+ legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case COLUMN:
return TableColumnMetaService.getInstance()
.deleteColumnsByLegacyTimeline(legacyTimeline,
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
@@ -376,8 +389,9 @@ public class JDBCBackend implements RelationalBackend {
versionRetentionCount,
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case POLICY:
- // todo: Implement delete old version logic for policies.
- return 0;
+ return PolicyMetaService.getInstance()
+ .deletePolicyVersionsByRetentionCount(
+ versionRetentionCount,
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
default:
throw new IllegalArgumentException(
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
new file mode 100644
index 0000000000..e2fd8058fe
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaMapper.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface PolicyMetaMapper {
+ String POLICY_META_TABLE_NAME = "policy_meta";
+
+ @Results({
+ @Result(property = "policyId", column = "policy_id"),
+ @Result(property = "policyName", column = "policy_name"),
+ @Result(property = "policyType", column = "policy_type"),
+ @Result(property = "metalakeId", column = "metalake_id"),
+ @Result(property = "inheritable", column = "inheritable"),
+ @Result(property = "exclusive", column = "exclusive"),
+ @Result(property = "supportedObjectTypes", column =
"supported_object_types"),
+ @Result(property = "auditInfo", column = "audit_info"),
+ @Result(property = "currentVersion", column = "current_version"),
+ @Result(property = "lastVersion", column = "last_version"),
+ @Result(property = "deletedAt", column = "deleted_at"),
+ @Result(property = "policyVersionPO.id", column = "id"),
+ @Result(property = "policyVersionPO.metalakeId", column =
"version_metalake_id"),
+ @Result(property = "policyVersionPO.policyId", column =
"version_policy_id"),
+ @Result(property = "policyVersionPO.version", column = "version"),
+ @Result(property = "policyVersionPO.policyComment", column =
"policy_comment"),
+ @Result(property = "policyVersionPO.enabled", column = "enabled"),
+ @Result(property = "policyVersionPO.content", column = "content"),
+ @Result(property = "policyVersionPO.deletedAt", column =
"version_deleted_at")
+ })
+ @SelectProvider(type = PolicyMetaSQLProviderFactory.class, method =
"listPolicyPOsByMetalake")
+ List<PolicyPO> listPolicyPOsByMetalake(@Param("metalakeName") String
metalakeName);
+
+ @InsertProvider(
+ type = PolicyMetaSQLProviderFactory.class,
+ method = "insertPolicyMetaOnDuplicateKeyUpdate")
+ void insertPolicyMetaOnDuplicateKeyUpdate(@Param("policyMeta") PolicyPO
policyPO);
+
+ @InsertProvider(type = PolicyMetaSQLProviderFactory.class, method =
"insertPolicyMeta")
+ void insertPolicyMeta(@Param("policyMeta") PolicyPO policyPO);
+
+ @Results({
+ @Result(property = "policyId", column = "policy_id"),
+ @Result(property = "policyName", column = "policy_name"),
+ @Result(property = "policyType", column = "policy_type"),
+ @Result(property = "metalakeId", column = "metalake_id"),
+ @Result(property = "inheritable", column = "inheritable"),
+ @Result(property = "exclusive", column = "exclusive"),
+ @Result(property = "supportedObjectTypes", column =
"supported_object_types"),
+ @Result(property = "auditInfo", column = "audit_info"),
+ @Result(property = "currentVersion", column = "current_version"),
+ @Result(property = "lastVersion", column = "last_version"),
+ @Result(property = "deletedAt", column = "deleted_at"),
+ @Result(property = "policyVersionPO.id", column = "id"),
+ @Result(property = "policyVersionPO.metalakeId", column =
"version_metalake_id"),
+ @Result(property = "policyVersionPO.policyId", column =
"version_policy_id"),
+ @Result(property = "policyVersionPO.version", column = "version"),
+ @Result(property = "policyVersionPO.policyComment", column =
"policy_comment"),
+ @Result(property = "policyVersionPO.enabled", column = "enabled"),
+ @Result(property = "policyVersionPO.content", column = "content"),
+ @Result(property = "policyVersionPO.deletedAt", column =
"version_deleted_at")
+ })
+ @SelectProvider(
+ type = PolicyMetaSQLProviderFactory.class,
+ method = "selectPolicyMetaByMetalakeAndName")
+ PolicyPO selectTagMetaByMetalakeAndName(
+ @Param("metalakeName") String metalakeName, @Param("policyName") String
policyName);
+
+ @UpdateProvider(type = PolicyMetaSQLProviderFactory.class, method =
"updatePolicyMeta")
+ Integer updatePolicyMeta(
+ @Param("newPolicyMeta") PolicyPO newPolicyMeta,
+ @Param("oldPolicyMeta") PolicyPO oldPolicyMeta);
+
+ @UpdateProvider(
+ type = PolicyMetaSQLProviderFactory.class,
+ method = "softDeletePolicyByMetalakeAndPolicyName")
+ Integer softDeletePolicyByMetalakeAndPolicyName(
+ @Param("metalakeName") String metalakeName, @Param("policyName") String
policyName);
+
+ @UpdateProvider(
+ type = PolicyMetaSQLProviderFactory.class,
+ method = "softDeletePolicyMetasByMetalakeId")
+ void softDeletePolicyMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
+
+ @DeleteProvider(
+ type = PolicyMetaSQLProviderFactory.class,
+ method = "deletePolicyMetasByLegacyTimeline")
+ Integer deletePolicyMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
new file mode 100644
index 0000000000..7a7243ca24
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyMetaSQLProviderFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyMetaBaseSQLProvider;
+import
org.apache.gravitino.storage.relational.mapper.provider.postgresql.PolicyMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyMetaSQLProviderFactory {
+
+ private static final Map<JDBCBackendType, PolicyMetaBaseSQLProvider>
+ POLICY_META_SQL_PROVIDER_MAP =
+ ImmutableMap.of(
+ JDBCBackendType.MYSQL, new PolicyMetaMySQLProvider(),
+ JDBCBackendType.H2, new PolicyMetaH2Provider(),
+ JDBCBackendType.POSTGRESQL, new PolicyMetaPostgreSQLProvider());
+
+ public static PolicyMetaBaseSQLProvider getProvider() {
+ String databaseId =
+ SqlSessionFactoryHelper.getInstance()
+ .getSqlSessionFactory()
+ .getConfiguration()
+ .getDatabaseId();
+
+ JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+ return POLICY_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+ }
+
+ public static String listPolicyPOsByMetalake(@Param("metalakeName") String
metalakeName) {
+ return getProvider().listPolicyPOsByMetalake(metalakeName);
+ }
+
+ public static String insertPolicyMetaOnDuplicateKeyUpdate(
+ @Param("policyMeta") PolicyPO policyPO) {
+ return getProvider().insertPolicyMetaOnDuplicateKeyUpdate(policyPO);
+ }
+
+ public static String insertPolicyMeta(@Param("policyMeta") PolicyPO
policyPO) {
+ return getProvider().insertPolicyMeta(policyPO);
+ }
+
+ public static String selectPolicyMetaByMetalakeAndName(
+ @Param("metalakeName") String metalakeName, @Param("policyName") String
policyName) {
+ return getProvider().selectPolicyMetaByMetalakeAndName(metalakeName,
policyName);
+ }
+
+ public static String updatePolicyMeta(
+ @Param("newPolicyMeta") PolicyPO newPolicyMeta,
+ @Param("oldPolicyMeta") PolicyPO oldPolicyMeta) {
+ return getProvider().updatePolicyMeta(newPolicyMeta, oldPolicyMeta);
+ }
+
+ public static String softDeletePolicyByMetalakeAndPolicyName(
+ @Param("metalakeName") String metalakeName, @Param("policyName") String
policyName) {
+ return getProvider().softDeletePolicyByMetalakeAndPolicyName(metalakeName,
policyName);
+ }
+
+ public static String deletePolicyMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return getProvider().deletePolicyMetasByLegacyTimeline(legacyTimeline,
limit);
+ }
+
+ public static String softDeletePolicyMetasByMetalakeId(@Param("metalakeId")
Long metalakeId) {
+ return getProvider().softDeletePolicyMetasByMetalakeId(metalakeId);
+ }
+
+ static class PolicyMetaMySQLProvider extends PolicyMetaBaseSQLProvider {}
+
+ static class PolicyMetaH2Provider extends PolicyMetaBaseSQLProvider {}
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionMapper.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionMapper.java
new file mode 100644
index 0000000000..9bfbc62ea7
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.PolicyMaxVersionPO;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface PolicyVersionMapper {
+ String POLICY_VERSION_TABLE_NAME = "policy_version_info";
+
+ @InsertProvider(
+ type = PolicyVersionSQLProviderFactory.class,
+ method = "insertPolicyVersionOnDuplicateKeyUpdate")
+ void insertPolicyVersionOnDuplicateKeyUpdate(
+ @Param("policyVersion") PolicyVersionPO policyVersionPO);
+
+ @InsertProvider(type = PolicyVersionSQLProviderFactory.class, method =
"insertPolicyVersion")
+ void insertPolicyVersion(@Param("policyVersion") PolicyVersionPO
policyVersionPO);
+
+ @UpdateProvider(
+ type = PolicyVersionSQLProviderFactory.class,
+ method = "softDeletePolicyVersionByMetalakeAndPolicyName")
+ Integer softDeletePolicyVersionByMetalakeAndPolicyName(
+ @Param("metalakeName") String metalakeName, @Param("policyName") String
policyName);
+
+ @UpdateProvider(
+ type = PolicyVersionSQLProviderFactory.class,
+ method = "deletePolicyVersionsByLegacyTimeline")
+ Integer deletePolicyVersionsByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+
+ @SelectProvider(
+ type = PolicyVersionSQLProviderFactory.class,
+ method = "selectPolicyVersionsByRetentionCount")
+ List<PolicyMaxVersionPO> selectPolicyVersionsByRetentionCount(
+ @Param("versionRetentionCount") Long versionRetentionCount);
+
+ @UpdateProvider(
+ type = PolicyVersionSQLProviderFactory.class,
+ method = "softDeletePolicyVersionsByRetentionLine")
+ Integer softDeletePolicyVersionsByRetentionLine(
+ @Param("policyId") Long policyId,
+ @Param("versionRetentionLine") long versionRetentionLine,
+ @Param("limit") int limit);
+
+ @UpdateProvider(
+ type = PolicyVersionSQLProviderFactory.class,
+ method = "softDeletePolicyVersionsByMetalakeId")
+ void softDeletePolicyVersionsByMetalakeId(@Param("metalakeId") Long
metalakeId);
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionSQLProviderFactory.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionSQLProviderFactory.java
new file mode 100644
index 0000000000..9fbe4ddc79
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/PolicyVersionSQLProviderFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyVersionBaseSQLProvider;
+import
org.apache.gravitino.storage.relational.mapper.provider.postgresql.PolicyVersionPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyVersionSQLProviderFactory {
+
+ private static final Map<JDBCBackendType, PolicyVersionBaseSQLProvider>
+ POLICY_VERSION_SQL_PROVIDER_MAP =
+ ImmutableMap.of(
+ JDBCBackendType.MYSQL, new PolicyVersionMySQLProvider(),
+ JDBCBackendType.H2, new PolicyVersionH2Provider(),
+ JDBCBackendType.POSTGRESQL, new
PolicyVersionPostgreSQLProvider());
+
+ public static PolicyVersionBaseSQLProvider getProvider() {
+ String databaseId =
+ SqlSessionFactoryHelper.getInstance()
+ .getSqlSessionFactory()
+ .getConfiguration()
+ .getDatabaseId();
+
+ JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+ return POLICY_VERSION_SQL_PROVIDER_MAP.get(jdbcBackendType);
+ }
+
+ public static String insertPolicyVersionOnDuplicateKeyUpdate(
+ @Param("policyVersion") PolicyVersionPO policyVersionPO) {
+ return
getProvider().insertPolicyVersionOnDuplicateKeyUpdate(policyVersionPO);
+ }
+
+ public static String insertPolicyVersion(
+ @Param("policyVersion") PolicyVersionPO policyVersionPO) {
+ return getProvider().insertPolicyVersion(policyVersionPO);
+ }
+
+ public static String softDeletePolicyVersionByMetalakeAndPolicyName(
+ @Param("metalakeName") String metalakeName, @Param("policyName") String
policyName) {
+ return
getProvider().softDeletePolicyVersionByMetalakeAndPolicyName(metalakeName,
policyName);
+ }
+
+ public static String deletePolicyVersionsByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return getProvider().deletePolicyVersionsByLegacyTimeline(legacyTimeline,
limit);
+ }
+
+ public static String selectPolicyVersionsByRetentionCount(
+ @Param("versionRetentionCount") Long versionRetentionCount) {
+ return
getProvider().selectPolicyVersionsByRetentionCount(versionRetentionCount);
+ }
+
+ public static String softDeletePolicyVersionsByRetentionLine(
+ @Param("policyId") Long policyId,
+ @Param("versionRetentionLine") long versionRetentionLine,
+ @Param("limit") int limit) {
+ return getProvider()
+ .softDeletePolicyVersionsByRetentionLine(policyId,
versionRetentionLine, limit);
+ }
+
+ public static String
softDeletePolicyVersionsByMetalakeId(@Param("metalakeId") Long metalakeId) {
+ return getProvider().softDeletePolicyVersionsByMetalakeId(metalakeId);
+ }
+
+ static class PolicyVersionMySQLProvider extends PolicyVersionBaseSQLProvider
{}
+
+ static class PolicyVersionH2Provider extends PolicyVersionBaseSQLProvider {}
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
new file mode 100644
index 0000000000..1f1e525071
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyMetaBaseSQLProvider.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static
org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper.POLICY_META_TABLE_NAME;
+import static
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper.POLICY_VERSION_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyMetaBaseSQLProvider {
+
+ public String listPolicyPOsByMetalake(@Param("metalakeName") String
metalakeName) {
+ return "SELECT pm.policy_id, pm.policy_name, pm.policy_type,
pm.metalake_id, pm.inheritable,"
+ + " pm.exclusive, pm.supported_object_types, pm.audit_info,
pm.current_version, pm.last_version,"
+ + " pm.deleted_at, pvi.id, pvi.metalake_id as version_metalake_id,
pvi.policy_id as version_policy_id,"
+ + " pvi.version, pvi.policy_comment, pvi.enabled, pvi.content,
pvi.deleted_at as version_deleted_at"
+ + " FROM "
+ + POLICY_META_TABLE_NAME
+ + " pm JOIN "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm ON pm.metalake_id = mm.metalake_id"
+ + " JOIN "
+ + POLICY_VERSION_TABLE_NAME
+ + " pvi ON pm.policy_id = pvi.policy_id"
+ + " AND pm.current_version = pvi.version"
+ + " WHERE mm.metalake_name = #{metalakeName}"
+ + " AND pm.deleted_at = 0 AND mm.deleted_at = 0"
+ + " AND pvi.deleted_at = 0";
+ }
+
+ public String insertPolicyMetaOnDuplicateKeyUpdate(@Param("policyMeta")
PolicyPO policyPO) {
+ return "INSERT INTO "
+ + POLICY_META_TABLE_NAME
+ + " (policy_id, policy_name, policy_type, metalake_id, inheritable,
exclusive,"
+ + " supported_object_types, audit_info, current_version, last_version,
deleted_at)"
+ + " VALUES (#{policyMeta.policyId}, #{policyMeta.policyName},
#{policyMeta.policyType},"
+ + " #{policyMeta.metalakeId}, #{policyMeta.inheritable},
#{policyMeta.exclusive},"
+ + " #{policyMeta.supportedObjectTypes}, #{policyMeta.auditInfo},
#{policyMeta.currentVersion},"
+ + " #{policyMeta.lastVersion}, #{policyMeta.deletedAt})"
+ + " ON DUPLICATE KEY UPDATE"
+ + " policy_name = #{policyMeta.policyName},"
+ + " policy_type = #{policyMeta.policyType},"
+ + " metalake_id = #{policyMeta.metalakeId},"
+ + " inheritable = #{policyMeta.inheritable},"
+ + " exclusive = #{policyMeta.exclusive},"
+ + " supported_object_types = #{policyMeta.supportedObjectTypes},"
+ + " audit_info = #{policyMeta.auditInfo},"
+ + " current_version = #{policyMeta.currentVersion},"
+ + " last_version = #{policyMeta.lastVersion},"
+ + " deleted_at = #{policyMeta.deletedAt}";
+ }
+
+ public String insertPolicyMeta(@Param("policyMeta") PolicyPO policyPO) {
+ return "INSERT INTO "
+ + POLICY_META_TABLE_NAME
+ + " (policy_id, policy_name, policy_type, metalake_id, inheritable,
exclusive,"
+ + " supported_object_types, audit_info, current_version, last_version,
deleted_at)"
+ + " VALUES (#{policyMeta.policyId}, #{policyMeta.policyName},
#{policyMeta.policyType},"
+ + " #{policyMeta.metalakeId}, #{policyMeta.inheritable},
#{policyMeta.exclusive},"
+ + " #{policyMeta.supportedObjectTypes}, #{policyMeta.auditInfo},
#{policyMeta.currentVersion},"
+ + " #{policyMeta.lastVersion}, #{policyMeta.deletedAt})";
+ }
+
+ public String selectPolicyMetaByMetalakeAndName(
+ @Param("metalakeName") String metalakeName, @Param("policyName") String
policyName) {
+ return "SELECT pm.policy_id, pm.policy_name, pm.policy_type,
pm.metalake_id, pm.inheritable,"
+ + " pm.exclusive, pm.supported_object_types, pm.audit_info,
pm.current_version, pm.last_version,"
+ + " pm.deleted_at, pvi.id, pvi.metalake_id as version_metalake_id,
pvi.policy_id as version_policy_id,"
+ + " pvi.version, pvi.policy_comment, pvi.enabled, pvi.content,
pvi.deleted_at as version_deleted_at"
+ + " FROM "
+ + POLICY_META_TABLE_NAME
+ + " pm JOIN "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm ON pm.metalake_id = mm.metalake_id"
+ + " JOIN "
+ + POLICY_VERSION_TABLE_NAME
+ + " pvi ON pm.policy_id = pvi.policy_id"
+ + " AND pm.current_version = pvi.version"
+ + " WHERE mm.metalake_name = #{metalakeName}"
+ + " AND pm.policy_name = #{policyName}"
+ + " AND pm.deleted_at = 0 AND mm.deleted_at = 0"
+ + " AND pvi.deleted_at = 0";
+ }
+
+ public String updatePolicyMeta(
+ @Param("newPolicyMeta") PolicyPO newPolicyMeta,
+ @Param("oldPolicyMeta") PolicyPO oldPolicyMeta) {
+ return "UPDATE "
+ + POLICY_META_TABLE_NAME
+ + " SET policy_name = #{newPolicyMeta.policyName},"
+ + " policy_type = #{newPolicyMeta.policyType},"
+ + " metalake_id = #{newPolicyMeta.metalakeId},"
+ + " inheritable = #{newPolicyMeta.inheritable},"
+ + " exclusive = #{newPolicyMeta.exclusive},"
+ + " supported_object_types = #{newPolicyMeta.supportedObjectTypes},"
+ + " audit_info = #{newPolicyMeta.auditInfo},"
+ + " current_version = #{newPolicyMeta.currentVersion},"
+ + " last_version = #{newPolicyMeta.lastVersion},"
+ + " deleted_at = #{newPolicyMeta.deletedAt}"
+ + " WHERE policy_id = #{oldPolicyMeta.policyId}"
+ + " AND policy_name = #{oldPolicyMeta.policyName}"
+ + " AND policy_type = #{oldPolicyMeta.policyType}"
+ + " AND metalake_id = #{oldPolicyMeta.metalakeId}"
+ + " AND inheritable = #{oldPolicyMeta.inheritable}"
+ + " AND exclusive = #{oldPolicyMeta.exclusive}"
+ + " AND supported_object_types = #{oldPolicyMeta.supportedObjectTypes}"
+ + " AND audit_info = #{oldPolicyMeta.auditInfo}"
+ + " AND current_version = #{oldPolicyMeta.currentVersion}"
+ + " AND last_version = #{oldPolicyMeta.lastVersion}"
+ + " AND deleted_at = 0";
+ }
+
+ public String softDeletePolicyByMetalakeAndPolicyName(
+ @Param("metalakeName") String metalakeName, @Param("policyName") String
policyName) {
+ return "UPDATE "
+ + POLICY_META_TABLE_NAME
+ + " pm SET pm.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE pm.metalake_id IN ("
+ + " SELECT mm.metalake_id FROM "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+ + " AND pm.policy_name = #{policyName} AND pm.deleted_at = 0";
+ }
+
+ public String deletePolicyMetasByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + POLICY_META_TABLE_NAME
+ + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT
#{limit}";
+ }
+
+ public String softDeletePolicyMetasByMetalakeId(@Param("metalakeId") Long
metalakeId) {
+ return "UPDATE "
+ + POLICY_META_TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyVersionBaseSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyVersionBaseSQLProvider.java
new file mode 100644
index 0000000000..2a4ceeca6f
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/PolicyVersionBaseSQLProvider.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import static
org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper.POLICY_META_TABLE_NAME;
+import static
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper.POLICY_VERSION_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
+import org.apache.ibatis.annotations.Param;
+
+public class PolicyVersionBaseSQLProvider {
+
+ public String insertPolicyVersionOnDuplicateKeyUpdate(
+ @Param("policyVersion") PolicyVersionPO policyVersion) {
+ return "INSERT INTO "
+ + POLICY_VERSION_TABLE_NAME
+ + " (metalake_id, policy_id, version, policy_comment, enabled,
content, deleted_at) "
+ + "VALUES (#{policyVersion.metalakeId}, #{policyVersion.policyId},
#{policyVersion.version}, #{policyVersion.policyComment}, "
+ + "#{policyVersion.enabled}, #{policyVersion.content},
#{policyVersion.deletedAt}) "
+ + "ON DUPLICATE KEY UPDATE "
+ + "metalake_id = #{policyVersion.metalakeId}, "
+ + "policy_id = #{policyVersion.policyId}, "
+ + "version = #{policyVersion.version}, "
+ + "policy_comment = #{policyVersion.policyComment}, "
+ + "enabled = #{policyVersion.enabled}, "
+ + "content = #{policyVersion.content}, "
+ + "deleted_at = #{policyVersion.deletedAt}";
+ }
+
+ public String insertPolicyVersion(@Param("policyVersion") PolicyVersionPO
policyVersion) {
+ return "INSERT INTO "
+ + POLICY_VERSION_TABLE_NAME
+ + " (metalake_id, policy_id, version, policy_comment, enabled,
content, deleted_at) "
+ + "VALUES (#{policyVersion.metalakeId}, #{policyVersion.policyId},
#{policyVersion.version}, "
+ + "#{policyVersion.policyComment}, #{policyVersion.enabled},
#{policyVersion.content}, "
+ + "#{policyVersion.deletedAt})";
+ }
+
+ public String softDeletePolicyVersionByMetalakeAndPolicyName(
+ @Param("metalakeName") String metalakeName, @Param("policyName") String
policyName) {
+ return "UPDATE "
+ + POLICY_VERSION_TABLE_NAME
+ + " pv SET pv.deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE pv.metalake_id IN ("
+ + " SELECT mm.metalake_id FROM "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+ + " AND pv.policy_id IN ("
+ + " SELECT pm.policy_id FROM "
+ + POLICY_META_TABLE_NAME
+ + " pm WHERE pm.policy_name = #{policyName} AND pm.deleted_at = 0"
+ + " AND pm.metalake_id IN ("
+ + " SELECT mm.metalake_id FROM "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at =
0))"
+ + " AND pv.deleted_at = 0";
+ }
+
+ public String deletePolicyVersionsByLegacyTimeline(
+ @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit)
{
+ return "DELETE FROM "
+ + POLICY_VERSION_TABLE_NAME
+ + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT
#{limit}";
+ }
+
+ public String selectPolicyVersionsByRetentionCount(
+ @Param("versionRetentionCount") Long versionRetentionCount) {
+ return "SELECT policy_id as policyId, "
+ + "Max(version) as version "
+ + "FROM "
+ + POLICY_VERSION_TABLE_NAME
+ + " WHERE version > #{versionRetentionCount} AND deleted_at = 0 "
+ + "GROUP BY policy_id";
+ }
+
+ public String softDeletePolicyVersionsByRetentionLine(
+ @Param("policyId") Long policyId,
+ @Param("versionRetentionLine") long versionRetentionLine,
+ @Param("limit") int limit) {
+ return "UPDATE "
+ + POLICY_VERSION_TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE policy_id = #{policyId} AND version <=
#{versionRetentionLine} AND deleted_at = 0"
+ + " LIMIT #{limit}";
+ }
+
+ public String softDeletePolicyVersionsByMetalakeId(@Param("metalakeId") Long
metalakeId) {
+ return "UPDATE "
+ + POLICY_VERSION_TABLE_NAME
+ + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+ + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyMetaPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyMetaPostgreSQLProvider.java
new file mode 100644
index 0000000000..8a2274af9f
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyMetaPostgreSQLProvider.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import static
org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.META_TABLE_NAME;
+
+import
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+
+public class PolicyMetaPostgreSQLProvider extends PolicyMetaBaseSQLProvider {
+
+ @Override
+ public String softDeletePolicyByMetalakeAndPolicyName(String metalakeName,
String policyName) {
+ return "UPDATE "
+ + META_TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000)))"
+ + " WHERE metalake_id = (SELECT metalake_id FROM "
+ + " metalake_meta mm WHERE mm.metalake_name = #{metalakeName} AND
mm.deleted_at = 0)"
+ + " AND policy_name = #{policyName} AND deleted_at = 0";
+ }
+
+ @Override
+ public String softDeletePolicyMetasByMetalakeId(Long metalakeId) {
+ return "UPDATE "
+ + META_TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000)))"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String deletePolicyMetasByLegacyTimeline(Long legacyTimeline, int
limit) {
+ return "DELETE FROM "
+ + META_TABLE_NAME
+ + " WHERE policy_id IN (SELECT policy_id FROM "
+ + META_TABLE_NAME
+ + " WHERE deleted_at = 0 AND legacy_timeline < #{legacyTimeline} LIMIT
#{limit})";
+ }
+
+ @Override
+ public String insertPolicyMetaOnDuplicateKeyUpdate(PolicyPO policyPO) {
+ return "INSERT INTO "
+ + META_TABLE_NAME
+ + " (policy_id, policy_name, policy_type, metalake_id, inheritable,
exclusive,"
+ + " supported_object_types, audit_info, current_version, last_version,
deleted_at)"
+ + " VALUES ("
+ + " #{policyPO.policyId},"
+ + " #{policyPO.policyName},"
+ + " #{policyPO.policyType},"
+ + " #{policyPO.metalakeId},"
+ + " #{policyPO.inheritable},"
+ + " #{policyPO.exclusive},"
+ + " #{policyPO.supportedObjectTypes},"
+ + " #{policyPO.auditInfo},"
+ + " #{policyPO.currentVersion},"
+ + " #{policyPO.lastVersion},"
+ + " #{policyPO.deletedAt})"
+ + " ON CONFLICT (policy_id) DO UPDATE SET"
+ + " policy_name = #{policyPO.policyName},"
+ + " policy_type = #{policyPO.policyType},"
+ + " metalake_id = #{policyPO.metalakeId},"
+ + " inheritable = #{policyPO.inheritable},"
+ + " exclusive = #{policyPO.exclusive},"
+ + " supported_object_types = #{policyPO.supportedObjectTypes},"
+ + " audit_info = #{policyPO.auditInfo},"
+ + " current_version = #{policyPO.currentVersion},"
+ + " last_version = #{policyPO.lastVersion}"
+ + " deleted_at = #{policyPO.deletedAt}";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyVersionPostgreSQLProvider.java
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyVersionPostgreSQLProvider.java
new file mode 100644
index 0000000000..9e396937b8
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/PolicyVersionPostgreSQLProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import static
org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper.POLICY_META_TABLE_NAME;
+import static
org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper.POLICY_VERSION_TABLE_NAME;
+
+import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import
org.apache.gravitino.storage.relational.mapper.provider.base.PolicyVersionBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
+
+public class PolicyVersionPostgreSQLProvider extends
PolicyVersionBaseSQLProvider {
+ @Override
+ public String softDeletePolicyVersionByMetalakeAndPolicyName(
+ String metalakeName, String policyName) {
+ return "UPDATE "
+ + POLICY_VERSION_TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000)))"
+ + " WHERE metalake_id = (SELECT metalake_id FROM "
+ + MetalakeMetaMapper.TABLE_NAME
+ + " mm WHERE mm.metalake_name = #{metalakeName} AND mm.deleted_at = 0)"
+ + " AND policy_id = (SELECT policy_id FROM "
+ + POLICY_META_TABLE_NAME
+ + " pm WHERE pm.policy_name = #{policyName} AND pm.deleted_at = 0)"
+ + " AND deleted_at = 0";
+ }
+
+ @Override
+ public String deletePolicyVersionsByLegacyTimeline(Long legacyTimeline, int
limit) {
+ return "DELETE FROM "
+ + POLICY_VERSION_TABLE_NAME
+ + " WHERE id IN (SELECT id FROM "
+ + POLICY_VERSION_TABLE_NAME
+ + " WHERE deleted_at = 0 AND legacy_timeline < #{legacyTimeline} LIMIT
#{limit})";
+ }
+
+ @Override
+ public String softDeletePolicyVersionsByRetentionLine(
+ Long policyId, long versionRetentionLine, int limit) {
+ return "UPDATE "
+ + POLICY_VERSION_TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000)))"
+ + " WHERE policy_id = #{policyId} AND version <
#{versionRetentionLine}"
+ + " AND deleted_at = 0 LIMIT #{limit}";
+ }
+
+ @Override
+ public String softDeletePolicyVersionsByMetalakeId(Long metalakeId) {
+ return "UPDATE "
+ + POLICY_VERSION_TABLE_NAME
+ + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+ + " timestamp '1970-01-01 00:00:00')*1000)))"
+ + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+ }
+
+ @Override
+ public String insertPolicyVersionOnDuplicateKeyUpdate(PolicyVersionPO
policyVersion) {
+ return "INSERT INTO "
+ + POLICY_VERSION_TABLE_NAME
+ + " (metalake_id, policy_id, version, policy_comment, enabled,"
+ + " content, deleted_at)"
+ + " VALUES ("
+ + " #{policyVersion.metalakeId},"
+ + " #{policyVersion.policyId},"
+ + " #{policyVersion.version},"
+ + " #{policyVersion.policyComment},"
+ + " #{policyVersion.enabled},"
+ + " #{policyVersion.content},"
+ + " #{policyVersion.deletedAt})"
+ + " ON CONFLICT (policy_id, version, deleted_at) DO UPDATE SET"
+ + " policy_comment = #{policyVersion.policyComment},"
+ + " enabled = #{policyVersion.enabled},"
+ + " content = #{policyVersion.content},"
+ + " deleted_at = #{policyVersion.deletedAt}";
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/po/PolicyMaxVersionPO.java
b/core/src/main/java/org/apache/gravitino/storage/relational/po/PolicyMaxVersionPO.java
new file mode 100644
index 0000000000..3f01b0e7b4
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/po/PolicyMaxVersionPO.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import java.util.Objects;
+import lombok.Getter;
+
+@Getter
+public class PolicyMaxVersionPO {
+ private Long policyId;
+ private Long version;
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PolicyMaxVersionPO)) return false;
+ PolicyMaxVersionPO that = (PolicyMaxVersionPO) o;
+ return Objects.equals(policyId, that.policyId) && Objects.equals(version,
that.version);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(policyId, version);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index 75e217279d..ac7b32306b 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -41,6 +41,8 @@ import
org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
import
org.apache.gravitino.storage.relational.mapper.ModelVersionAliasRelMapper;
import org.apache.gravitino.storage.relational.mapper.ModelVersionMetaMapper;
import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper;
import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
@@ -243,6 +245,14 @@ public class MetalakeMetaService {
SessionUtils.doWithoutCommit(
TagMetadataObjectRelMapper.class,
mapper ->
mapper.softDeleteTagMetadataObjectRelsByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ PolicyMetaMapper.class,
+ mapper ->
mapper.softDeletePolicyMetasByMetalakeId(metalakeId)),
+ () ->
+ SessionUtils.doWithoutCommit(
+ PolicyVersionMapper.class,
+ mapper ->
mapper.softDeletePolicyVersionsByMetalakeId(metalakeId)),
() ->
SessionUtils.doWithoutCommit(
OwnerMetaMapper.class,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
new file mode 100644
index 0000000000..8176224622
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/PolicyMetaService.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.PolicyEntity;
+import org.apache.gravitino.storage.relational.mapper.PolicyMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.PolicyVersionMapper;
+import org.apache.gravitino.storage.relational.po.PolicyMaxVersionPO;
+import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
+import org.apache.gravitino.storage.relational.utils.POConverters;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PolicyMetaService {
+ private static final PolicyMetaService INSTANCE = new PolicyMetaService();
+ private static final Logger LOG =
LoggerFactory.getLogger(PolicyMetaService.class);
+
+ public static PolicyMetaService getInstance() {
+ return INSTANCE;
+ }
+
+ private PolicyMetaService() {}
+
+ public List<PolicyEntity> listPoliciesByNamespace(Namespace namespace) {
+ String metalakeName = namespace.level(0);
+ List<PolicyPO> policyPOs =
+ SessionUtils.getWithoutCommit(
+ PolicyMetaMapper.class, mapper ->
mapper.listPolicyPOsByMetalake(metalakeName));
+ return policyPOs.stream()
+ .map(policyPO -> POConverters.fromPolicyPO(policyPO, namespace))
+ .collect(Collectors.toList());
+ }
+
+ public PolicyEntity getPolicyByIdentifier(NameIdentifier ident) {
+ String metalakeName = ident.namespace().level(0);
+ PolicyPO policyPO = getPolicyPOByMetalakeAndName(metalakeName,
ident.name());
+ return POConverters.fromPolicyPO(policyPO, ident.namespace());
+ }
+
+ public void insertPolicy(PolicyEntity policyEntity, boolean overwritten)
throws IOException {
+ Namespace ns = policyEntity.namespace();
+ String metalakeName = ns.level(0);
+
+ try {
+ Long metalakeId =
MetalakeMetaService.getInstance().getMetalakeIdByName(metalakeName);
+
+ PolicyPO.Builder builder = PolicyPO.builder().withMetalakeId(metalakeId);
+ PolicyPO policyPO =
POConverters.initializePolicyPOWithVersion(policyEntity, builder);
+
+ // insert both policy meta table and policy version table
+ SessionUtils.doMultipleWithCommit(
+ () ->
+ SessionUtils.doWithoutCommit(
+ PolicyMetaMapper.class,
+ mapper -> {
+ if (overwritten) {
+ mapper.insertPolicyMetaOnDuplicateKeyUpdate(policyPO);
+ } else {
+ mapper.insertPolicyMeta(policyPO);
+ }
+ }),
+ () ->
+ SessionUtils.doWithoutCommit(
+ PolicyVersionMapper.class,
+ mapper -> {
+ if (overwritten) {
+
mapper.insertPolicyVersionOnDuplicateKeyUpdate(policyPO.getPolicyVersionPO());
+ } else {
+
mapper.insertPolicyVersion(policyPO.getPolicyVersionPO());
+ }
+ }));
+ } catch (RuntimeException e) {
+ ExceptionUtils.checkSQLException(e, Entity.EntityType.POLICY,
policyEntity.toString());
+ throw e;
+ }
+ }
+
+ public <E extends Entity & HasIdentifier> PolicyEntity updatePolicy(
+ NameIdentifier ident, Function<E, E> updater) throws IOException {
+ String metalakeName = ident.namespace().level(0);
+
+ PolicyPO oldPolicyPO = getPolicyPOByMetalakeAndName(metalakeName,
ident.name());
+ PolicyEntity oldPolicyEntity = POConverters.fromPolicyPO(oldPolicyPO,
ident.namespace());
+ PolicyEntity updatedPolicyEntity = (PolicyEntity) updater.apply((E)
oldPolicyEntity);
+ Preconditions.checkArgument(
+ Objects.equals(oldPolicyEntity.id(), updatedPolicyEntity.id()),
+ "The updated policy entity id: %s must have the same id as the old
entity id %s",
+ updatedPolicyEntity.id(),
+ oldPolicyEntity.id());
+
+ Integer updateResult;
+ try {
+ boolean checkNeedUpdateVersion =
+ POConverters.checkPolicyVersionNeedUpdate(
+ oldPolicyPO.getPolicyVersionPO(), updatedPolicyEntity);
+ PolicyPO newPolicyPO =
+ POConverters.updatePolicyPOWithVersion(
+ oldPolicyPO, updatedPolicyEntity, checkNeedUpdateVersion);
+ if (checkNeedUpdateVersion) {
+ SessionUtils.doMultipleWithCommit(
+ () ->
+ SessionUtils.doWithoutCommit(
+ PolicyVersionMapper.class,
+ mapper ->
mapper.insertPolicyVersion(newPolicyPO.getPolicyVersionPO())),
+ () ->
+ SessionUtils.doWithoutCommit(
+ PolicyMetaMapper.class,
+ mapper -> mapper.updatePolicyMeta(newPolicyPO,
oldPolicyPO)));
+ // we set the updateResult to 1 to indicate that the update is
successful
+ updateResult = 1;
+ } else {
+ updateResult =
+ SessionUtils.doWithCommitAndFetchResult(
+ PolicyMetaMapper.class,
+ mapper -> mapper.updatePolicyMeta(newPolicyPO, oldPolicyPO));
+ }
+ } catch (RuntimeException re) {
+ ExceptionUtils.checkSQLException(
+ re, Entity.EntityType.POLICY,
updatedPolicyEntity.nameIdentifier().toString());
+ throw re;
+ }
+
+ if (updateResult > 0) {
+ return updatedPolicyEntity;
+ } else {
+ throw new IOException("Failed to update the entity: " +
updatedPolicyEntity);
+ }
+ }
+
+ public boolean deletePolicy(NameIdentifier ident) {
+ String metalakeName = ident.namespace().level(0);
+ int[] policyMetaDeletedCount = new int[] {0};
+ int[] policyVersionDeletedCount = new int[] {0};
+
+ // We should delete meta and version info
+ SessionUtils.doMultipleWithCommit(
+ () ->
+ policyMetaDeletedCount[0] =
+ SessionUtils.doWithoutCommitAndFetchResult(
+ PolicyMetaMapper.class,
+ mapper ->
+
mapper.softDeletePolicyByMetalakeAndPolicyName(metalakeName, ident.name())),
+ () ->
+ policyVersionDeletedCount[0] =
+ SessionUtils.doWithoutCommitAndFetchResult(
+ PolicyVersionMapper.class,
+ mapper ->
+ mapper.softDeletePolicyVersionByMetalakeAndPolicyName(
+ metalakeName, ident.name())));
+ return policyMetaDeletedCount[0] + policyVersionDeletedCount[0] > 0;
+ }
+
+ public int deletePolicyAndVersionMetasByLegacyTimeline(Long legacyTimeline,
int limit) {
+ int policyDeletedCount =
+ SessionUtils.doWithCommitAndFetchResult(
+ PolicyMetaMapper.class,
+ mapper -> mapper.deletePolicyMetasByLegacyTimeline(legacyTimeline,
limit));
+
+ int policyVersionDeletedCount =
+ SessionUtils.doWithCommitAndFetchResult(
+ PolicyVersionMapper.class,
+ mapper ->
mapper.deletePolicyVersionsByLegacyTimeline(legacyTimeline, limit));
+
+ return policyDeletedCount + policyVersionDeletedCount;
+ }
+
+ public int deletePolicyVersionsByRetentionCount(Long versionRetentionCount,
int limit) {
+ // get the current version of all policies.
+ List<PolicyMaxVersionPO> policyMaxVersions =
+ SessionUtils.getWithoutCommit(
+ PolicyVersionMapper.class,
+ mapper ->
mapper.selectPolicyVersionsByRetentionCount(versionRetentionCount));
+
+ // soft delete old versions that are smaller than or equal to (maxVersion -
+ // versionRetentionCount).
+ int totalDeletedCount = 0;
+ for (PolicyMaxVersionPO policyMaxVersion : policyMaxVersions) {
+ long versionRetentionLine = policyMaxVersion.getVersion() -
versionRetentionCount;
+ int deletedCount =
+ SessionUtils.doWithCommitAndFetchResult(
+ PolicyVersionMapper.class,
+ mapper ->
+ mapper.softDeletePolicyVersionsByRetentionLine(
+ policyMaxVersion.getPolicyId(), versionRetentionLine,
limit));
+ totalDeletedCount += deletedCount;
+
+ // log the deletion by max policy version.
+ LOG.info(
+ "Soft delete policyVersions count: {} which versions are smaller
than or equal to"
+ + " versionRetentionLine: {}, the current policyId and
maxVersion is: <{}, {}>.",
+ deletedCount,
+ versionRetentionLine,
+ policyMaxVersion.getPolicyId(),
+ policyMaxVersion.getVersion());
+ }
+ return totalDeletedCount;
+ }
+
+ private PolicyPO getPolicyPOByMetalakeAndName(String metalakeName, String
policyName) {
+ PolicyPO policyPO =
+ SessionUtils.getWithoutCommit(
+ PolicyMetaMapper.class,
+ mapper -> mapper.selectTagMetaByMetalakeAndName(metalakeName,
policyName));
+
+ if (policyPO == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.POLICY.name().toLowerCase(),
+ policyName);
+ }
+ return policyPO;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 2b299886ee..5763f96d37 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import java.time.Instant;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -59,6 +60,7 @@ import org.apache.gravitino.meta.TagEntity;
import org.apache.gravitino.meta.TopicEntity;
import org.apache.gravitino.meta.UserEntity;
import org.apache.gravitino.policy.Policy;
+import org.apache.gravitino.policy.PolicyContent;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.Expression;
import org.apache.gravitino.rel.types.Type;
@@ -76,6 +78,7 @@ import
org.apache.gravitino.storage.relational.po.ModelVersionAliasRelPO;
import org.apache.gravitino.storage.relational.po.ModelVersionPO;
import org.apache.gravitino.storage.relational.po.OwnerRelPO;
import org.apache.gravitino.storage.relational.po.PolicyPO;
+import org.apache.gravitino.storage.relational.po.PolicyVersionPO;
import org.apache.gravitino.storage.relational.po.RolePO;
import org.apache.gravitino.storage.relational.po.SchemaPO;
import org.apache.gravitino.storage.relational.po.SecurableObjectPO;
@@ -677,6 +680,78 @@ public class POConverters {
}
}
+ public static boolean checkPolicyVersionNeedUpdate(
+ PolicyVersionPO oldPolicyVersionPO, PolicyEntity newPolicy) {
+ if (!StringUtils.equals(oldPolicyVersionPO.getPolicyComment(),
newPolicy.comment())
+ || oldPolicyVersionPO.isEnabled() != newPolicy.enabled()) {
+ return true;
+ }
+
+ try {
+ PolicyContent oldContent =
+ JsonUtils.anyFieldMapper()
+ .readValue(
+ oldPolicyVersionPO.getContent(),
+
Policy.BuiltInType.fromPolicyType(newPolicy.policyType()).contentClass());
+ if (oldContent == null) {
+ return newPolicy.content() != null;
+ }
+ return !oldContent.equals(newPolicy.content());
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to deserialize json object:", e);
+ }
+ }
+
+ public static PolicyPO updatePolicyPOWithVersion(
+ PolicyPO oldPolicyPO, PolicyEntity newPolicy, boolean needUpdateVersion)
{
+ try {
+ Long lastVersion = oldPolicyPO.getLastVersion();
+ Long currentVersion;
+ PolicyVersionPO newPolicyVersionPO;
+ // Will set the version to the last version + 1
+ if (needUpdateVersion) {
+ lastVersion++;
+ currentVersion = lastVersion;
+ newPolicyVersionPO =
+ PolicyVersionPO.builder()
+ .withMetalakeId(oldPolicyPO.getMetalakeId())
+ .withPolicyId(newPolicy.id())
+ .withVersion(currentVersion)
+ .withPolicyComment(newPolicy.comment())
+ .withEnabled(newPolicy.enabled())
+
.withContent(JsonUtils.anyFieldMapper().writeValueAsString(newPolicy.content()))
+ .withDeletedAt(DEFAULT_DELETED_AT)
+ .build();
+ } else {
+ currentVersion = oldPolicyPO.getCurrentVersion();
+ newPolicyVersionPO = oldPolicyPO.getPolicyVersionPO();
+ }
+ return PolicyPO.builder()
+ .withPolicyId(newPolicy.id())
+ .withPolicyName(newPolicy.name())
+ .withPolicyType(newPolicy.policyType())
+ .withMetalakeId(oldPolicyPO.getMetalakeId())
+ .withInheritable(newPolicy.inheritable())
+ .withExclusive(newPolicy.exclusive())
+ .withSupportedObjectTypes(
+ JsonUtils.anyFieldMapper()
+ .writeValueAsString(
+ // Sort the supported object types to ensure consistent
ordering
+ newPolicy.supportedObjectTypes().stream()
+ .map(Enum::name)
+ .sorted()
+
.collect(Collectors.toCollection(LinkedHashSet::new))))
+
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newPolicy.auditInfo()))
+ .withCurrentVersion(currentVersion)
+ .withLastVersion(lastVersion)
+ .withDeletedAt(DEFAULT_DELETED_AT)
+ .withPolicyVersionPO(newPolicyVersionPO)
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize json object:", e);
+ }
+ }
+
/**
* Convert {@link FilesetPO} to {@link FilesetEntity}
*
@@ -1339,6 +1414,45 @@ public class POConverters {
}
}
+ public static PolicyPO initializePolicyPOWithVersion(
+ PolicyEntity policyEntity, PolicyPO.Builder builder) {
+ try {
+ String content =
JsonUtils.anyFieldMapper().writeValueAsString(policyEntity.content());
+ PolicyVersionPO policyVersionPO =
+ PolicyVersionPO.builder()
+ .withMetalakeId(builder.getMetalakeId())
+ .withPolicyId(policyEntity.id())
+ .withVersion(INIT_VERSION)
+ .withPolicyComment(policyEntity.comment())
+ .withEnabled(policyEntity.enabled())
+ .withContent(content)
+ .withDeletedAt(DEFAULT_DELETED_AT)
+ .build();
+ return builder
+ .withPolicyId(policyEntity.id())
+ .withPolicyName(policyEntity.name())
+ .withPolicyType(policyEntity.policyType())
+ .withInheritable(policyEntity.inheritable())
+ .withExclusive(policyEntity.exclusive())
+ .withSupportedObjectTypes(
+ JsonUtils.anyFieldMapper()
+ .writeValueAsString(
+ // Sort the supported object types to ensure consistent
ordering
+ policyEntity.supportedObjectTypes().stream()
+ .map(Enum::name)
+ .sorted()
+
.collect(Collectors.toCollection(LinkedHashSet::new))))
+
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(policyEntity.auditInfo()))
+ .withCurrentVersion(INIT_VERSION)
+ .withLastVersion(INIT_VERSION)
+ .withDeletedAt(DEFAULT_DELETED_AT)
+ .withPolicyVersionPO(policyVersionPO)
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to serialize json object:", e);
+ }
+ }
+
public static OwnerRelPO initializeOwnerRelPOsWithVersion(
Long metalakeId,
String ownerType,
diff --git
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index e8d25169ce..e556cbce9f 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -101,6 +101,17 @@ public class NameIdentifierUtil {
return NameIdentifier.of(NamespaceUtil.ofTag(metalake), tagName);
}
+ /**
+ * Create the policy {@link NameIdentifier} with the given metalake and
policy name.
+ *
+ * @param metalake The metalake name
+ * @param policyName The policy name
+ * @return the created policy {@link NameIdentifier}
+ */
+ public static NameIdentifier ofPolicy(String metalake, String policyName) {
+ return NameIdentifier.of(NamespaceUtil.ofPolicy(metalake), policyName);
+ }
+
/**
* Create the user {@link NameIdentifier} with the given metalake and
username.
*
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index 7c0169ce6c..56337a18aa 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -30,6 +30,7 @@ import static org.apache.gravitino.Configs.ENTITY_STORE;
import static org.apache.gravitino.Configs.RELATIONAL_ENTITY_STORE;
import static org.apache.gravitino.SupportsRelationOperations.Type.OWNER_REL;
import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+import static org.apache.gravitino.policy.Policy.SUPPORTS_ALL_OBJECT_TYPES;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -41,6 +42,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
@@ -72,6 +74,7 @@ import org.apache.gravitino.meta.FilesetEntity;
import org.apache.gravitino.meta.GroupEntity;
import org.apache.gravitino.meta.ModelEntity;
import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.meta.PolicyEntity;
import org.apache.gravitino.meta.RoleEntity;
import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.meta.SchemaVersion;
@@ -79,6 +82,7 @@ import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.meta.TagEntity;
import org.apache.gravitino.meta.TopicEntity;
import org.apache.gravitino.meta.UserEntity;
+import org.apache.gravitino.policy.PolicyContents;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
@@ -96,9 +100,12 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.condition.EnabledIf;
import org.mockito.Mockito;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+// Avoid re-executing tests in subclasses
+@EnabledIf("isDirectlyRunningThisClass")
public class TestJDBCBackend {
private final String JDBC_STORE_PATH =
"/tmp/gravitino_jdbc_entityStore_" +
UUID.randomUUID().toString().replace("-", "");
@@ -110,6 +117,10 @@ public class TestJDBCBackend {
Configs.DEFAULT_ENTITY_RELATIONAL_STORE,
JDBCBackend.class.getCanonicalName());
protected RelationalBackend backend;
+ static boolean isDirectlyRunningThisClass() {
+ return MethodHandles.lookup().lookupClass() == TestJDBCBackend.class;
+ }
+
@BeforeAll
public void setup() {
File dir = new File(DB_DIR);
@@ -215,6 +226,21 @@ public class TestJDBCBackend {
backend.insert(metalake, false);
assertThrows(EntityAlreadyExistsException.class, () ->
backend.insert(metalakeCopy, false));
+ PolicyEntity policy =
+ createPolicy(
+ RandomIdGenerator.INSTANCE.nextId(),
+ NamespaceUtil.ofPolicy("metalake"),
+ "policy",
+ auditInfo);
+ PolicyEntity policyCopy =
+ createPolicy(
+ RandomIdGenerator.INSTANCE.nextId(),
+ NamespaceUtil.ofPolicy("metalake"),
+ "policy",
+ auditInfo);
+ backend.insert(policy, false);
+ assertThrows(EntityAlreadyExistsException.class, () ->
backend.insert(policyCopy, false));
+
CatalogEntity catalog =
createCatalog(
RandomIdGenerator.INSTANCE.nextId(),
@@ -333,6 +359,28 @@ public class TestJDBCBackend {
Entity.EntityType.METALAKE,
e -> createBaseMakeLake(metalakeCopy.id(), "metalake",
auditInfo)));
+ PolicyEntity policy =
+ createPolicy(
+ RandomIdGenerator.INSTANCE.nextId(),
+ NamespaceUtil.ofPolicy("metalake"),
+ "policy",
+ auditInfo);
+ PolicyEntity policy1 =
+ createPolicy(
+ RandomIdGenerator.INSTANCE.nextId(),
+ NamespaceUtil.ofPolicy("metalake"),
+ "policy1",
+ auditInfo);
+ backend.insert(policy, false);
+ backend.insert(policy1, false);
+ assertThrows(
+ EntityAlreadyExistsException.class,
+ () ->
+ backend.update(
+ policy1.nameIdentifier(),
+ Entity.EntityType.POLICY,
+ e -> createPolicy(policy1.id(), policy1.namespace(), "policy",
auditInfo)));
+
CatalogEntity catalog =
createCatalog(
RandomIdGenerator.INSTANCE.nextId(),
@@ -550,6 +598,30 @@ public class TestJDBCBackend {
createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake",
auditInfo);
backend.insert(metalake, false);
+ PolicyEntity policy =
+ createPolicy(
+ RandomIdGenerator.INSTANCE.nextId(),
+ NamespaceUtil.ofPolicy("metalake"),
+ "policy",
+ auditInfo);
+ backend.insert(policy, false);
+ // update policy enabled and version
+ PolicyEntity policyV2 =
+ PolicyEntity.builder()
+ .withId(policy.id())
+ .withNamespace(policy.namespace())
+ .withName(policy.name())
+ .withPolicyType(policy.policyType())
+ .withComment(policy.comment())
+ .withEnabled(!policy.enabled())
+ .withExclusive(policy.exclusive())
+ .withInheritable(policy.inheritable())
+ .withSupportedObjectTypes(policy.supportedObjectTypes())
+ .withContent(policy.content())
+ .withAuditInfo(auditInfo)
+ .build();
+ backend.update(policy.nameIdentifier(), Entity.EntityType.POLICY, e ->
policyV2);
+
CatalogEntity catalog =
createCatalog(
RandomIdGenerator.INSTANCE.nextId(),
@@ -655,6 +727,46 @@ public class TestJDBCBackend {
createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(),
"another-metalake", auditInfo);
backend.insert(anotherMetaLake, false);
+ PolicyEntity anotherPolicy =
+ createPolicy(
+ RandomIdGenerator.INSTANCE.nextId(),
+ NamespaceUtil.ofPolicy("another-metalake"),
+ "another-policy",
+ auditInfo);
+ backend.insert(anotherPolicy, false);
+ // update another policy enabled and version
+ PolicyEntity anotherPolicyV2 =
+ PolicyEntity.builder()
+ .withId(anotherPolicy.id())
+ .withNamespace(anotherPolicy.namespace())
+ .withName(anotherPolicy.name())
+ .withPolicyType(anotherPolicy.policyType())
+ .withComment(anotherPolicy.comment())
+ .withEnabled(!anotherPolicy.enabled())
+ .withExclusive(anotherPolicy.exclusive())
+ .withInheritable(anotherPolicy.inheritable())
+ .withSupportedObjectTypes(anotherPolicy.supportedObjectTypes())
+ .withContent(anotherPolicy.content())
+ .withAuditInfo(auditInfo)
+ .build();
+ backend.update(anotherPolicy.nameIdentifier(), Entity.EntityType.POLICY, e
-> anotherPolicyV2);
+ // update another policy comment and version
+ PolicyEntity anotherPolicyV3 =
+ PolicyEntity.builder()
+ .withId(anotherPolicy.id())
+ .withNamespace(anotherPolicy.namespace())
+ .withName(anotherPolicy.name())
+ .withPolicyType(anotherPolicy.policyType())
+ .withComment("v3")
+ .withEnabled(anotherPolicyV2.enabled())
+ .withExclusive(anotherPolicy.exclusive())
+ .withInheritable(anotherPolicy.inheritable())
+ .withSupportedObjectTypes(SUPPORTS_ALL_OBJECT_TYPES)
+ .withContent(anotherPolicy.content())
+ .withAuditInfo(auditInfo)
+ .build();
+ backend.update(anotherPolicy.nameIdentifier(), Entity.EntityType.POLICY, e
-> anotherPolicyV3);
+
CatalogEntity anotherCatalog =
createCatalog(
RandomIdGenerator.INSTANCE.nextId(),
@@ -743,6 +855,11 @@ public class TestJDBCBackend {
backend.list(metalake.namespace(), Entity.EntityType.METALAKE, true);
assertTrue(metaLakes.contains(metalake));
+ List<PolicyEntity> policies = backend.list(policy.namespace(),
Entity.EntityType.POLICY, true);
+ assertFalse(policies.contains(policy));
+ assertTrue(policies.contains(policyV2));
+ assertEquals(policyV2.enabled(),
policies.get(policies.indexOf(policyV2)).enabled());
+
List<CatalogEntity> catalogs =
backend.list(catalog.namespace(), Entity.EntityType.CATALOG, true);
assertTrue(catalogs.contains(catalog));
@@ -872,6 +989,9 @@ public class TestJDBCBackend {
assertFalse(backend.exists(metalake.nameIdentifier(),
Entity.EntityType.METALAKE));
assertTrue(backend.exists(anotherMetaLake.nameIdentifier(),
Entity.EntityType.METALAKE));
+ assertFalse(backend.exists(policy.nameIdentifier(),
Entity.EntityType.POLICY));
+ assertTrue(backend.exists(anotherPolicy.nameIdentifier(),
Entity.EntityType.POLICY));
+
assertFalse(backend.exists(catalog.nameIdentifier(),
Entity.EntityType.CATALOG));
assertTrue(backend.exists(anotherCatalog.nameIdentifier(),
Entity.EntityType.CATALOG));
@@ -911,6 +1031,7 @@ public class TestJDBCBackend {
// check legacy record after soft delete
assertTrue(legacyRecordExistsInDB(metalake.id(),
Entity.EntityType.METALAKE));
+ assertTrue(legacyRecordExistsInDB(policy.id(), Entity.EntityType.POLICY));
assertTrue(legacyRecordExistsInDB(catalog.id(),
Entity.EntityType.CATALOG));
assertTrue(legacyRecordExistsInDB(schema.id(), Entity.EntityType.SCHEMA));
assertTrue(legacyRecordExistsInDB(table.id(), Entity.EntityType.TABLE));
@@ -926,6 +1047,8 @@ public class TestJDBCBackend {
assertEquals(2, countRoleRels(anotherRole.id()));
assertEquals(2, listFilesetVersions(fileset.id()).size());
assertEquals(3, listFilesetVersions(anotherFileset.id()).size());
+ assertEquals(2, listPolicyVersions(policy.id()).size());
+ assertEquals(3, listPolicyVersions(anotherPolicy.id()).size());
assertTrue(legacyRecordExistsInDB(tag.id(), Entity.EntityType.TAG));
// meta data hard delete
@@ -942,9 +1065,11 @@ public class TestJDBCBackend {
assertFalse(legacyRecordExistsInDB(role.id(), Entity.EntityType.ROLE));
assertFalse(legacyRecordExistsInDB(user.id(), Entity.EntityType.USER));
assertFalse(legacyRecordExistsInDB(group.id(), Entity.EntityType.GROUP));
+ assertFalse(legacyRecordExistsInDB(policy.id(), Entity.EntityType.POLICY));
assertEquals(0, countRoleRels(role.id()));
assertEquals(2, countRoleRels(anotherRole.id()));
assertEquals(0, listFilesetVersions(fileset.id()).size());
+ assertEquals(0, listPolicyVersions(policy.id()).size());
assertFalse(legacyRecordExistsInDB(tag.id(), Entity.EntityType.TAG));
assertEquals(0, countOwnerRel(metalake.id()));
assertEquals(1, countOwnerRel(anotherMetaLake.id()));
@@ -962,6 +1087,16 @@ public class TestJDBCBackend {
// hard delete for old version fileset
backend.hardDeleteLegacyData(Entity.EntityType.FILESET,
Instant.now().toEpochMilli() + 1000);
assertEquals(1, listFilesetVersions(anotherFileset.id()).size());
+
+ // soft delete for old version policy
+ assertEquals(3, listPolicyVersions(anotherPolicy.id()).size());
+ for (Entity.EntityType entityType : Entity.EntityType.values()) {
+ backend.deleteOldVersionData(entityType, 1);
+ }
+ Map<Integer, Long> versionDeletedMap2 =
listPolicyVersions(anotherPolicy.id());
+ assertEquals(3, versionDeletedMap2.size());
+ assertEquals(1, versionDeletedMap2.values().stream().filter(value -> value
== 0L).count());
+ assertEquals(2, versionDeletedMap2.values().stream().filter(value -> value
!= 0L).count());
}
@Test
@@ -1105,6 +1240,10 @@ public class TestJDBCBackend {
tableName = "tag_meta";
idColumnName = "tag_id";
break;
+ case POLICY:
+ tableName = "policy_meta";
+ idColumnName = "policy_id";
+ break;
default:
throw new IllegalArgumentException("Unsupported entity type: " +
entityType);
}
@@ -1144,6 +1283,26 @@ public class TestJDBCBackend {
return versionDeletedTime;
}
+ private Map<Integer, Long> listPolicyVersions(Long policyId) {
+ Map<Integer, Long> versionDeletedTime = new HashMap<>();
+ try (SqlSession sqlSession =
+
SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true);
+ Connection connection = sqlSession.getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet rs =
+ statement.executeQuery(
+ String.format(
+ "SELECT version, deleted_at FROM policy_version_info WHERE
policy_id = %d",
+ policyId))) {
+ while (rs.next()) {
+ versionDeletedTime.put(rs.getInt("version"), rs.getLong("deleted_at"));
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("SQL execution failed", e);
+ }
+ return versionDeletedTime;
+ }
+
private Integer countRoleRels(Long roleId) {
int count = 0;
try (SqlSession sqlSession =
@@ -1329,6 +1488,22 @@ public class TestJDBCBackend {
.build();
}
+ public static PolicyEntity createPolicy(Long id, Namespace ns, String name,
AuditInfo auditInfo) {
+ return PolicyEntity.builder()
+ .withId(id)
+ .withNamespace(ns)
+ .withName(name)
+ .withPolicyType("test")
+ .withComment("")
+ .withEnabled(true)
+ .withExclusive(true)
+ .withInheritable(true)
+ .withSupportedObjectTypes(SUPPORTS_ALL_OBJECT_TYPES)
+ .withContent(PolicyContents.custom(ImmutableMap.of("filed1", 123),
null))
+ .withAuditInfo(auditInfo)
+ .build();
+ }
+
public static SchemaEntity createSchemaEntity(
Long id, Namespace namespace, String name, AuditInfo auditInfo) {
return SchemaEntity.builder()
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
new file mode 100644
index 0000000000..5f89469fb9
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPolicyMetaService.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.PolicyEntity;
+import org.apache.gravitino.policy.PolicyContent;
+import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestPolicyMetaService extends TestJDBCBackend {
+ private final String metalakeName = "metalake_for_policy_test";
+
+ private final AuditInfo auditInfo =
+
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();
+
+ private final PolicyContent content =
PolicyContents.custom(ImmutableMap.of("filed1", 123), null);
+
+ private final Set<MetadataObject.Type> supportedObjectTypes =
+ new HashSet<MetadataObject.Type>() {
+ {
+ add(MetadataObject.Type.CATALOG);
+ add(MetadataObject.Type.SCHEMA);
+ }
+ };
+
+ @Test
+ public void testInsertAndGetPolicyByIdentifier() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName,
auditInfo);
+ backend.insert(metalake, false);
+
+ // Test no policy entity.
+ PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+ Exception excep =
+ Assertions.assertThrows(
+ NoSuchEntityException.class,
+ () ->
+ policyMetaService.getPolicyByIdentifier(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy1")));
+ Assertions.assertEquals("No such policy entity: policy1",
excep.getMessage());
+
+ // Test get policy entity
+ PolicyEntity policyEntity =
+ PolicyEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("policy1")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withComment("comment")
+ .withPolicyType("test")
+ .withContent(content)
+ .withEnabled(true)
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withAuditInfo(auditInfo)
+ .build();
+ policyMetaService.insertPolicy(policyEntity, false);
+
+ PolicyEntity resultpolicyEntity =
+ policyMetaService.getPolicyByIdentifier(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy1"));
+ Assertions.assertEquals(policyEntity, resultpolicyEntity);
+
+ // Test with null comment and content properties.
+ PolicyEntity PolicyEntity1 =
+ PolicyEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("policy2")
+ .withPolicyType("test")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withAuditInfo(auditInfo)
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withContent(content)
+ .build();
+
+ policyMetaService.insertPolicy(PolicyEntity1, false);
+ PolicyEntity resultPolicyEntity1 =
+ policyMetaService.getPolicyByIdentifier(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy2"));
+ Assertions.assertEquals(PolicyEntity1, resultPolicyEntity1);
+ Assertions.assertTrue(resultPolicyEntity1.enabled());
+ Assertions.assertNull(resultPolicyEntity1.comment());
+ Assertions.assertEquals(supportedObjectTypes,
resultPolicyEntity1.supportedObjectTypes());
+ Assertions.assertNull(resultPolicyEntity1.content().properties());
+
+ // Test insert with overwrite.
+ PolicyEntity PolicyEntity2 =
+ PolicyEntity.builder()
+ .withId(PolicyEntity1.id())
+ .withName("policy3")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withComment("comment")
+ .withPolicyType("test")
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withContent(content)
+ .withAuditInfo(auditInfo)
+ .build();
+
+ Assertions.assertThrows(
+ Exception.class, () -> policyMetaService.insertPolicy(PolicyEntity2,
false));
+
+ policyMetaService.insertPolicy(PolicyEntity2, true);
+
+ PolicyEntity resultPolicyEntity2 =
+ policyMetaService.getPolicyByIdentifier(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy3"));
+ Assertions.assertEquals(PolicyEntity2, resultPolicyEntity2);
+ }
+
+ @Test
+ public void testCreateAndListPolicies() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName,
auditInfo);
+ backend.insert(metalake, false);
+
+ PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+ PolicyEntity policyEntity1 =
+ PolicyEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("policy1")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withComment("comment")
+ .withPolicyType("test")
+ .withContent(content)
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withAuditInfo(auditInfo)
+ .build();
+ policyMetaService.insertPolicy(policyEntity1, false);
+
+ PolicyEntity policyEntity2 =
+ PolicyEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("policy2")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withComment("comment")
+ .withPolicyType("test")
+ .withContent(content)
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withAuditInfo(auditInfo)
+ .build();
+ policyMetaService.insertPolicy(policyEntity2, false);
+
+ List<PolicyEntity> policyEntities =
+
policyMetaService.listPoliciesByNamespace(NamespaceUtil.ofPolicy(metalakeName));
+ Assertions.assertEquals(2, policyEntities.size());
+ Assertions.assertTrue(policyEntities.contains(policyEntity1));
+ Assertions.assertTrue(policyEntities.contains(policyEntity2));
+ }
+
+ @Test
+ public void testUpdatePolicy() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName,
auditInfo);
+ backend.insert(metalake, false);
+
+ PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+ PolicyEntity policyEntity1 =
+ PolicyEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("policy1")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withComment("comment")
+ .withPolicyType("test")
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withContent(content)
+ .withAuditInfo(auditInfo)
+ .build();
+ policyMetaService.insertPolicy(policyEntity1, false);
+
+ // Update with no policy entity.
+ Exception excep =
+ Assertions.assertThrows(
+ NoSuchEntityException.class,
+ () ->
+ policyMetaService.updatePolicy(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy2"),
+ policyEntity -> policyEntity));
+ Assertions.assertEquals("No such policy entity: policy2",
excep.getMessage());
+
+ // Update policy entity.
+ PolicyEntity policyEntity2 =
+ PolicyEntity.builder()
+ .withId(policyEntity1.id())
+ .withName("policy1")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withComment("comment1")
+ .withPolicyType("test")
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withContent(content)
+ .withAuditInfo(auditInfo)
+ .build();
+ PolicyEntity updatedPolicyEntity =
+ policyMetaService.updatePolicy(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy1"), policyEntity
-> policyEntity2);
+ Assertions.assertEquals(policyEntity2, updatedPolicyEntity);
+
+ PolicyEntity loadedPolicyEntity =
+ policyMetaService.getPolicyByIdentifier(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy1"));
+ Assertions.assertEquals(policyEntity2, loadedPolicyEntity);
+
+ // Update with different id.
+ PolicyEntity policyEntity3 =
+ PolicyEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("policy1")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withComment("comment1")
+ .withPolicyType("test")
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withContent(content)
+ .withAuditInfo(auditInfo)
+ .build();
+
+ Exception excep1 =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ policyMetaService.updatePolicy(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy1"),
+ policyEntity -> policyEntity3));
+ Assertions.assertEquals(
+ "The updated policy entity id: "
+ + policyEntity3.id()
+ + " must have the same id as the old "
+ + "entity id "
+ + policyEntity2.id(),
+ excep1.getMessage());
+
+ PolicyEntity loadedPolicyEntity1 =
+ policyMetaService.getPolicyByIdentifier(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy1"));
+ Assertions.assertEquals(policyEntity2, loadedPolicyEntity1);
+ }
+
+ @Test
+ public void testDeletePolicy() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName,
auditInfo);
+ backend.insert(metalake, false);
+
+ PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+ PolicyEntity policyEntity1 =
+ PolicyEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("policy1")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withComment("comment")
+ .withPolicyType("test")
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withContent(content)
+ .withAuditInfo(auditInfo)
+ .build();
+ policyMetaService.insertPolicy(policyEntity1, false);
+
+ boolean deleted =
+
policyMetaService.deletePolicy(NameIdentifierUtil.ofPolicy(metalakeName,
"policy1"));
+ Assertions.assertTrue(deleted);
+
+ deleted =
policyMetaService.deletePolicy(NameIdentifierUtil.ofPolicy(metalakeName,
"policy1"));
+ Assertions.assertFalse(deleted);
+
+ Exception excep =
+ Assertions.assertThrows(
+ NoSuchEntityException.class,
+ () ->
+ policyMetaService.getPolicyByIdentifier(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy1")));
+ Assertions.assertEquals("No such policy entity: policy1",
excep.getMessage());
+ }
+
+ @Test
+ public void testDeleteMetalake() throws IOException {
+ BaseMetalake metalake =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName,
auditInfo);
+ backend.insert(metalake, false);
+
+ PolicyMetaService policyMetaService = PolicyMetaService.getInstance();
+ PolicyEntity policyEntity1 =
+ PolicyEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("policy1")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName))
+ .withComment("comment")
+ .withPolicyType("test")
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withContent(content)
+ .withAuditInfo(auditInfo)
+ .build();
+ policyMetaService.insertPolicy(policyEntity1, false);
+
+ Assertions.assertTrue(
+
MetalakeMetaService.getInstance().deleteMetalake(metalake.nameIdentifier(),
false));
+ Assertions.assertThrows(
+ NoSuchEntityException.class,
+ () ->
+ policyMetaService.getPolicyByIdentifier(
+ NameIdentifierUtil.ofPolicy(metalakeName, "policy1")));
+
+ // Test delete metalake with cascade.
+ BaseMetalake metalake1 =
+ createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName +
"1", auditInfo);
+ backend.insert(metalake1, false);
+
+ PolicyEntity policyEntity2 =
+ PolicyEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName("policy2")
+ .withNamespace(NamespaceUtil.ofPolicy(metalakeName + "1"))
+ .withComment("comment")
+ .withPolicyType("test")
+ .withSupportedObjectTypes(supportedObjectTypes)
+ .withContent(content)
+ .withAuditInfo(auditInfo)
+ .build();
+
+ policyMetaService.insertPolicy(policyEntity2, false);
+ Assertions.assertTrue(
+
MetalakeMetaService.getInstance().deleteMetalake(metalake1.nameIdentifier(),
true));
+ Assertions.assertThrows(
+ NoSuchEntityException.class,
+ () ->
+ policyMetaService.getPolicyByIdentifier(
+ NameIdentifierUtil.ofPolicy(metalakeName + "1", "policy2")));
+ }
+}