errose28 commented on a change in pull request #3131: URL: https://github.com/apache/ozone/pull/3131#discussion_r822189574
########## File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java ########## @@ -0,0 +1,475 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.om.multitenant; + +import static java.lang.Thread.sleep; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ConcurrentHashMultiset; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +/** + * Background Sync thread that reads Multitnancy state from OMDB + * and applies it to Ranger. + */ +public class OMRangerBGSync implements Runnable, Closeable { + + private OzoneManager ozoneManager; + private OzoneClient ozoneClient; + + private static final Logger LOG = LoggerFactory + .getLogger(OMRangerBGSync.class); + private static int WAIT_MILI = 1000; + private static int MAX_ATTEMPT = 2; + + /** + * ExecutorService used for scheduling sync operation. + */ + private final ScheduledExecutorService executorService; + private ScheduledFuture<?> rangerSyncFuture; + private final int rangerSyncInterval; + + long rangerBGSyncCounter = 0; + long currentOzoneServiceVersionInOMDB; + long proposedOzoneServiceVersionInOMDB; + public static int ozoneServiceId; + + MultiTenantAccessAuthorizerRangerPlugin authorizer; + + class BGRole{ + String name; + String id; + HashSet<String> users; + + BGRole(String n) { + this.name = n; + users = new HashSet<>(); + } + + void addId(String i) { + this.id =i; + } + + void addUsers(String u) { + users.add(u); + } + + HashSet<String> getUsers() { + return users; + } + + @Override + public int hashCode() { + return name.hashCode(); + } + } + + // we will track all the policies in Ranger here. After we have + // processed all the policies from OMDB, this map will + // be left with policies that we need to delete. + // Its a map of Policy ID to policy names + ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted = + new ConcurrentHashMap<>(); + + // This map will be used to keep all the policies that are found in + // OMDB and should have been in Ranger. Currently, we are only printing such + // policyID. This can result if a tenant is deleted but the system + // crashed. Its an easy recovery to retry the "tenant delete" operation. + // Its a map of policy ID to policy names + ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated = + new ConcurrentHashMap<>(); + + // This map will keep all the Multiotenancy related roles from Ranger. + ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>(); + + // keep OMDB mapping of Roles -> list of user principals. + ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles = + new ConcurrentHashMap<>(); + + // Every BG ranger sync cycle we update this + long lastRangerPolicyLoadTime; + + public OMRangerBGSync(OzoneManager om) throws Exception { + try { + ozoneManager = om; + authorizer = new MultiTenantAccessAuthorizerRangerPlugin(); + authorizer.init(om.getConfiguration()); + ozoneClient = + OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration()); + executorService = HadoopExecutors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("OM Ranger Sync Thread - %d").build()); + rangerSyncInterval = + ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL, + OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT); + scheduleNextRangerSync(); + ozoneServiceId = authorizer.getOzoneServiceId(); + } catch (Exception e) { + LOG.warn("Failed to Initialize Ranger Background Sync"); + throw e; + } + } + + public int getOzoneServiceId() throws Exception { + return ozoneServiceId; + } + + public int getRangerSyncInterval() throws Exception { + return rangerSyncInterval; + } + + @Override + public void run() { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + try { + if (ozoneManager.isLeaderReady()) { + executeOneRangerSyncCycle(); + } + } catch (Exception e) { + LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage()); + } finally { + // Lets Schedule the next cycle now. We do not deliberaty schedule at + // fixed interval to account for ranger sync processing time. + scheduleNextRangerSync(); + } + } + + private void scheduleNextRangerSync() { + + if (!Thread.currentThread().isInterrupted() && + !executorService.isShutdown()) { + rangerSyncFuture = executorService.schedule(this, + rangerSyncInterval, TimeUnit.SECONDS); + } else { + LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " + + "processing thread for Ozone MultiTenant Manager."); + } + } + + @Override + public void close() { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.error("Unable to shutdown OM Ranger Background Sync properly."); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + private void executeOneRangerSyncCycle() { + int attempt = 0; + try { + // Taking the lock only makes sure that while we are reading + // the current Ozone service version, another multitenancy + // request is not changing it. We can drop the lock after that. + while (!ozoneManager.getMultiTenantManager() + .tryAcquireInProgressMtOp(WAIT_MILI)) { + sleep(10); + } + currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion(); + proposedOzoneServiceVersionInOMDB = getRangerServiceVersion(); + while (currentOzoneServiceVersionInOMDB != + proposedOzoneServiceVersionInOMDB) { + if (++attempt > MAX_ATTEMPT) { + break; + } + ozoneManager.getMultiTenantManager().resetInProgressMtOpState(); + if (!ozoneManager.isLeaderReady()) { + return; + } + LOG.warn("Executing Ranger Sync Cycle."); + + executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB); + + if (currentOzoneServiceVersionInOMDB != + proposedOzoneServiceVersionInOMDB) { + // Submit Ratis Request to sync the new ozone service version in OMDB + setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB); + } + while (!ozoneManager.getMultiTenantManager() + .tryAcquireInProgressMtOp(WAIT_MILI)) { + sleep(10); + } + currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB; + proposedOzoneServiceVersionInOMDB = getRangerServiceVersion(); + } + } catch (Exception e) { + LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() ); Review comment: Also if/when we switch to using the `RangerRestMultiTenantAccessController` there should only be a small set of checked exceptions which we will need to declare caught here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
