mansehajsingh commented on code in PR #4:
URL: https://github.com/apache/polaris-tools/pull/4#discussion_r2035922780


##########
polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java:
##########
@@ -0,0 +1,1135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.tools.sync.polaris;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogRole;
+import org.apache.polaris.core.admin.model.GrantResource;
+import org.apache.polaris.core.admin.model.PrincipalRole;
+import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
+import org.apache.polaris.tools.sync.polaris.access.AccessControlService;
+import org.apache.polaris.tools.sync.polaris.catalog.BaseTableWithETag;
+import org.apache.polaris.tools.sync.polaris.catalog.ETagService;
+import org.apache.polaris.tools.sync.polaris.catalog.NotModifiedException;
+import org.apache.polaris.tools.sync.polaris.catalog.PolarisCatalog;
+import org.apache.polaris.tools.sync.polaris.planning.SynchronizationPlanner;
+import org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates idempotent and failure-safe logic to perform Polaris entity 
syncs. Performs logging
+ * with configurability and all actions related to the generated sync plans.
+ */
+public class PolarisSynchronizer {
+
+  private final Logger clientLogger;
+
+  private final SynchronizationPlanner syncPlanner;
+
+  private final PolarisService source;
+
+  private final PolarisService target;
+
+  private final PrincipalWithCredentials sourceOmnipotentPrincipal;
+
+  private final PrincipalWithCredentials targetOmnipotentPrincipal;
+
+  private final PrincipalRole sourceOmnipotentPrincipalRole;
+
+  private final PrincipalRole targetOmnipotentPrincipalRole;
+
+  private final AccessControlService sourceAccessControlService;
+
+  private final AccessControlService targetAccessControlService;
+
+  private final ETagService etagService;
+
+  public PolarisSynchronizer(
+      Logger clientLogger,
+      SynchronizationPlanner synchronizationPlanner,
+      PrincipalWithCredentials sourceOmnipotentPrincipal,
+      PrincipalWithCredentials targetOmnipotentPrincipal,
+      PolarisService source,
+      PolarisService target,
+      ETagService etagService) {
+    this.clientLogger =
+        clientLogger == null ? 
LoggerFactory.getLogger(PolarisSynchronizer.class) : clientLogger;
+    this.syncPlanner = synchronizationPlanner;
+    this.sourceOmnipotentPrincipal = sourceOmnipotentPrincipal;
+    this.targetOmnipotentPrincipal = targetOmnipotentPrincipal;
+    this.source = source;
+    this.target = target;
+    this.sourceAccessControlService = new AccessControlService(source);
+    this.targetAccessControlService = new AccessControlService(target);
+
+    this.sourceOmnipotentPrincipalRole =
+        sourceAccessControlService.getOmnipotentPrincipalRoleForPrincipal(
+            sourceOmnipotentPrincipal.getPrincipal().getName());
+    this.targetOmnipotentPrincipalRole =
+        targetAccessControlService.getOmnipotentPrincipalRoleForPrincipal(
+            targetOmnipotentPrincipal.getPrincipal().getName());
+    this.etagService = etagService;
+  }
+
+  /**
+   * Calculates the total number of sync tasks to complete.
+   *
+   * @param plan the plan to scan for cahnges
+   * @return the nuber of syncs to perform
+   */
+  private int totalSyncsToComplete(SynchronizationPlan<?> plan) {
+    return plan.entitiesToCreate().size()
+        + plan.entitiesToOverwrite().size()
+        + plan.entitiesToRemove().size();
+  }
+
+  /** Sync principal roles from source to target. */
+  public void syncPrincipalRoles() {
+    List<PrincipalRole> principalRolesSource;
+
+    try {
+      principalRolesSource = source.listPrincipalRoles();
+      clientLogger.info("Listed {} principal-roles from source.", 
principalRolesSource.size());
+    } catch (Exception e) {
+      clientLogger.error("Failed to list principal-roles from source.", e);
+      return;
+    }
+
+    List<PrincipalRole> principalRolesTarget;
+
+    try {
+      principalRolesTarget = target.listPrincipalRoles();
+      clientLogger.info("Listed {} principal-roles from target.", 
principalRolesTarget.size());
+    } catch (Exception e) {
+      clientLogger.error("Failed to list principal-roles from target.", e);
+      return;
+    }
+
+    SynchronizationPlan<PrincipalRole> principalRoleSyncPlan =
+        syncPlanner.planPrincipalRoleSync(principalRolesSource, 
principalRolesTarget);
+
+    principalRoleSyncPlan
+        .entitiesToSkip()
+        .forEach(
+            principalRole ->
+                clientLogger.info("Skipping principal-role {}.", 
principalRole.getName()));
+
+    principalRoleSyncPlan
+        .entitiesNotModified()
+        .forEach(
+            principalRole ->
+                clientLogger.info(
+                    "No change detected for principal-role {}, skipping.",
+                    principalRole.getName()));
+
+    int syncsCompleted = 0;
+    final int totalSyncsToComplete = 
totalSyncsToComplete(principalRoleSyncPlan);
+
+    for (PrincipalRole principalRole : 
principalRoleSyncPlan.entitiesToCreate()) {
+      try {
+        target.createPrincipalRole(principalRole, false);
+        clientLogger.info(
+            "Created principal-role {} on target. - {}/{}",
+            principalRole.getName(),
+            ++syncsCompleted,
+            totalSyncsToComplete);
+      } catch (Exception e) {
+        clientLogger.error(
+            "Failed to create principal-role {} on target. - {}/{}",
+            principalRole.getName(),
+            ++syncsCompleted,
+            totalSyncsToComplete,
+            e);
+      }
+    }
+
+    for (PrincipalRole principalRole : 
principalRoleSyncPlan.entitiesToOverwrite()) {
+      try {
+        target.createPrincipalRole(principalRole, true);
+        clientLogger.info(
+            "Overwrote principal-role {} on target. - {}/{}",
+            principalRole.getName(),
+            ++syncsCompleted,
+            totalSyncsToComplete);
+      } catch (Exception e) {
+        clientLogger.error(
+            "Failed to overwrite principal-role {} on target. - {}/{}",
+            principalRole.getName(),
+            ++syncsCompleted,
+            totalSyncsToComplete,
+            e);
+      }
+    }
+
+    for (PrincipalRole principalRole : 
principalRoleSyncPlan.entitiesToRemove()) {
+      try {
+        target.removePrincipalRole(principalRole.getName());
+        clientLogger.info(
+            "Removed principal-role {} on target. - {}/{}",
+            principalRole.getName(),
+            ++syncsCompleted,
+            totalSyncsToComplete);
+      } catch (Exception e) {
+        clientLogger.error(
+            "Failed to remove principal-role {} on target. - {}/{}",
+            principalRole.getName(),
+            ++syncsCompleted,
+            totalSyncsToComplete,
+            e);
+      }
+    }
+  }
+
+  /**
+   * Sync assignments of principal roles to a catalog role.
+   *
+   * @param catalogName the catalog that the catalog role is in
+   * @param catalogRoleName the name of the catalog role
+   */
+  public void syncAssigneePrincipalRolesForCatalogRole(String catalogName, 
String catalogRoleName) {
+    List<PrincipalRole> principalRolesSource;
+
+    try {
+      principalRolesSource =
+          source.listAssigneePrincipalRolesForCatalogRole(catalogName, 
catalogRoleName);
+      clientLogger.info(
+          "Listed {} assignee principal-roles for catalog-role {} in catalog 
{} from source.",
+          principalRolesSource.size(),
+          catalogRoleName,
+          catalogName);
+    } catch (Exception e) {
+      clientLogger.error(
+          "Failed to list assignee principal-roles for catalog-role {} in 
catalog {} from source.",
+          catalogRoleName,
+          catalogName,
+          e);
+      return;
+    }
+
+    List<PrincipalRole> principalRolesTarget;
+
+    try {
+      principalRolesTarget =
+          target.listAssigneePrincipalRolesForCatalogRole(catalogName, 
catalogRoleName);
+      clientLogger.info(
+          "Listed {} assignee principal-roles for catalog-role {} in catalog 
{} from target.",
+          principalRolesTarget.size(),
+          catalogRoleName,
+          catalogName);
+    } catch (Exception e) {
+      clientLogger.error(
+          "Failed to list assignee principal-roles for catalog-role {} in 
catalog {} from target.",
+          catalogRoleName,
+          catalogName,
+          e);
+      return;
+    }
+
+    SynchronizationPlan<PrincipalRole> assignedPrincipalRoleSyncPlan =
+        syncPlanner.planAssignPrincipalRolesToCatalogRolesSync(
+            catalogName, catalogRoleName, principalRolesSource, 
principalRolesTarget);
+
+    assignedPrincipalRoleSyncPlan
+        .entitiesToSkip()
+        .forEach(
+            principalRole ->
+                clientLogger.info(
+                    "Skipping assignment of principal-role {} to catalog-role 
{} in catalog {}.",
+                    principalRole.getName(),
+                    catalogRoleName,
+                    catalogName));
+
+    assignedPrincipalRoleSyncPlan
+        .entitiesNotModified()
+        .forEach(
+            principalRole ->
+                clientLogger.info(
+                    "Principal-role {} is already assigned to catalog-role {} 
in catalog {}. Skipping.",
+                    principalRole.getName(),
+                    catalogRoleName,
+                    catalogName));
+
+    int syncsCompleted = 0;
+    int totalSyncsToComplete = 
totalSyncsToComplete(assignedPrincipalRoleSyncPlan);
+
+    for (PrincipalRole principalRole : 
assignedPrincipalRoleSyncPlan.entitiesToCreate()) {
+      try {
+        target.assignCatalogRoleToPrincipalRole(
+            principalRole.getName(), catalogName, catalogRoleName);
+        clientLogger.info(
+            "Assigned principal-role {} to catalog-role {} in catalog {}. - 
{}/{}",
+            principalRole.getName(),
+            catalogRoleName,
+            catalogName,
+            ++syncsCompleted,
+            totalSyncsToComplete);
+      } catch (Exception e) {
+        clientLogger.error(
+            "Failed to assign principal-role {} to catalog-role {} in catalog 
{}. - {}/{}",
+            principalRole.getName(),
+            catalogRoleName,
+            catalogName,
+            ++syncsCompleted,
+            totalSyncsToComplete,
+            e);
+      }
+    }
+
+    for (PrincipalRole principalRole : 
assignedPrincipalRoleSyncPlan.entitiesToOverwrite()) {
+      try {
+        target.assignCatalogRoleToPrincipalRole(
+            principalRole.getName(), catalogName, catalogRoleName);
+        clientLogger.info(
+            "Assigned principal-role {} to catalog-role {} in catalog {}. - 
{}/{}",
+            principalRole.getName(),
+            catalogRoleName,
+            catalogName,
+            ++syncsCompleted,
+            totalSyncsToComplete);
+      } catch (Exception e) {
+        clientLogger.error(
+            "Failed to assign principal-role {} to catalog-role {} in catalog 
{}. - {}/{}",
+            principalRole.getName(),
+            catalogRoleName,
+            catalogName,
+            ++syncsCompleted,
+            totalSyncsToComplete,
+            e);
+      }
+    }
+
+    for (PrincipalRole principalRole : 
assignedPrincipalRoleSyncPlan.entitiesToRemove()) {
+      try {
+        target.removeCatalogRoleFromPrincipalRole(
+            principalRole.getName(), catalogName, catalogRoleName);
+        clientLogger.info(
+            "Revoked principal-role {} from catalog-role {} in catalog {}. - 
{}/{}",
+            principalRole.getName(),
+            catalogRoleName,
+            catalogName,
+            ++syncsCompleted,
+            totalSyncsToComplete);
+      } catch (Exception e) {
+        clientLogger.error(
+            "Failed to revoke principal-role {} from catalog-role {} in 
catalog {}. - {}/{}",
+            principalRole.getName(),
+            catalogRoleName,
+            catalogName,
+            ++syncsCompleted,
+            totalSyncsToComplete,
+            e);
+      }
+    }
+  }
+
+  /** Sync catalogs across the source and target polaris instance. */
+  public void syncCatalogs() {
+    List<Catalog> catalogsSource;
+
+    try {
+      catalogsSource = source.listCatalogs();
+      clientLogger.info("Listed {} catalogs from source.", 
catalogsSource.size());
+    } catch (Exception e) {
+      clientLogger.error("Failed to list catalogs from source.", e);
+      return;
+    }
+
+    List<Catalog> catalogsTarget;
+
+    try {
+      catalogsTarget = target.listCatalogs();
+      clientLogger.info("Listed {} catalogs from target.", 
catalogsTarget.size());
+    } catch (Exception e) {
+      clientLogger.error("Failed to list catalogs from target.", e);
+      return;
+    }
+
+    SynchronizationPlan<Catalog> catalogSyncPlan =
+        syncPlanner.planCatalogSync(catalogsSource, catalogsTarget);
+
+    catalogSyncPlan
+        .entitiesToSkip()
+        .forEach(catalog -> clientLogger.info("Skipping catalog {}.", 
catalog.getName()));
+
+    catalogSyncPlan
+        .entitiesToSkipAndSkipChildren()
+        .forEach(
+            catalog ->
+                clientLogger.info(
+                    "Skipping catalog {} and all child entities.", 
catalog.getName()));
+
+    catalogSyncPlan
+        .entitiesNotModified()
+        .forEach(
+            catalog ->
+                clientLogger.info(
+                    "No change detected in catalog {}. Skipping.", 
catalog.getName()));
+
+    int syncsCompleted = 0;
+    int totalSyncsToComplete = totalSyncsToComplete(catalogSyncPlan);
+
+    for (Catalog catalog : catalogSyncPlan.entitiesToCreate()) {
+      try {
+        target.createCatalog(catalog);
+        clientLogger.info(
+            "Created catalog {}. - {}/{}",
+            catalog.getName(),
+            ++syncsCompleted,
+            totalSyncsToComplete);
+      } catch (Exception e) {
+        clientLogger.error(
+            "Failed to create catalog {}. - {}/{}",
+            catalog.getName(),
+            ++syncsCompleted,
+            totalSyncsToComplete,
+            e);
+      }
+    }
+
+    for (Catalog catalog : catalogSyncPlan.entitiesToOverwrite()) {
+      try {
+        setupOmnipotentCatalogRoleIfNotExistsTarget(catalog.getName());

Review Comment:
   This is because the overwrite of the catalog requires us to perform a 
cascading drop of the catalog. We only need to setup this omnipotent principal 
when we are initializing an iceberg rest client. On a createCatalog, we don't 
need an omnipotent principal until the time of syncing namespaces and tables. 
On overwrite and remove we need to do it before hand so we can drop catalog 
internals like namespaces and tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to