http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java deleted file mode 100644 index 42770df..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java +++ /dev/null @@ -1,530 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import org.apache.sentry.core.common.utils.PubSub; -import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; - -import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME; -import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.jdo.JDODataStoreException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.thrift.TException; -import org.apache.sentry.service.thrift.SentryHMSClient; -import org.apache.sentry.service.thrift.HiveConnectionFactory; -import org.apache.sentry.service.thrift.HiveNotificationFetcher; -import org.apache.sentry.api.common.SentryServiceUtil; -import org.apache.sentry.service.thrift.SentryStateBank; -import org.apache.sentry.service.thrift.SentryServiceState; -import org.apache.sentry.service.thrift.HMSFollowerState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * HMSFollower is the thread which follows the Hive MetaStore state changes from Sentry. - * It gets the full update and notification logs from HMS and applies it to - * update permissions stored in Sentry using SentryStore and also update the < obj,path > state - * stored for HDFS-Sentry sync. - */ -public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { - - private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class); - private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: "; - private static boolean connectedToHms = false; - - private SentryHMSClient client; - private final Configuration authzConf; - private final SentryStore sentryStore; - private final NotificationProcessor notificationProcessor; - private boolean readyToServe; - private final HiveNotificationFetcher notificationFetcher; - private final boolean hdfsSyncEnabled; - private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false); - - private final LeaderStatusMonitor leaderMonitor; - - /** - * Current generation of HMS snapshots. HMSFollower is single-threaded, so no need - * to protect against concurrent modification. - */ - private long hmsImageId = SentryStore.EMPTY_PATHS_SNAPSHOT_ID; - - /** - * Configuring Hms Follower thread. - * - * @param conf sentry configuration - * @param store sentry store - * @param leaderMonitor singleton instance of LeaderStatusMonitor - */ - public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, - HiveConnectionFactory hiveConnectionFactory) { - this(conf, store, leaderMonitor, hiveConnectionFactory, null); - } - - /** - * Constructor should be used only for testing purposes. - * - * @param conf sentry configuration - * @param store sentry store - * @param leaderMonitor - * @param authServerName Server that sentry is Authorizing - */ - @VisibleForTesting - public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, - HiveConnectionFactory hiveConnectionFactory, String authServerName) { - LOGGER.info("HMSFollower is being initialized"); - readyToServe = false; - authzConf = conf; - this.leaderMonitor = leaderMonitor; - sentryStore = store; - - if (authServerName == null) { - authServerName = conf.get(AUTHZ_SERVER_NAME.getVar(), - conf.get(AUTHZ_SERVER_NAME_DEPRECATED.getVar(), AUTHZ_SERVER_NAME_DEPRECATED.getDefault())); - } - - notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf); - client = new SentryHMSClient(authzConf, hiveConnectionFactory); - hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync - notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory); - - // subscribe to full update notification - if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) { - LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName()); - PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this); - } - if(!hdfsSyncEnabled) { - try { - // Clear all the HMS metadata learned so far and learn it fresh when the feature - // is enabled back. - store.clearHmsPathInformation(); - } catch (Exception ex) { - LOGGER.error("Failed to clear HMS path info", ex); - LOGGER.error("Please manually clear data from SENTRY_PATH_CHANGE/AUTHZ_PATH/AUTHZ_PATHS_MAPPING tables." + - "If not, HDFS ACL's will be inconsistent when HDFS sync feature is enabled back."); - } - } - } - - @VisibleForTesting - public static boolean isConnectedToHms() { - return connectedToHms; - } - - @VisibleForTesting - void setSentryHmsClient(SentryHMSClient client) { - this.client = client; - } - - @Override - public void close() { - if (client != null) { - // Close any outstanding connections to HMS - try { - client.disconnect(); - SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED); - } catch (Exception failure) { - LOGGER.error("Failed to close the Sentry Hms Client", failure); - } - } - - notificationFetcher.close(); - } - - @Override - public void run() { - SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED); - long lastProcessedNotificationId; - try { - try { - // Initializing lastProcessedNotificationId based on the latest persisted notification ID. - lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID(); - } catch (Exception e) { - LOGGER.error("Failed to get the last processed notification id from sentry store, " - + "Skipping the processing", e); - return; - } - // Wake any clients connected to this service waiting for HMS already processed notifications. - wakeUpWaitingClientsForSync(lastProcessedNotificationId); - // Only the leader should listen to HMS updates - if (!isLeader()) { - // Close any outstanding connections to HMS - close(); - return; - } - syncupWithHms(lastProcessedNotificationId); - } finally { - SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED); - } - } - - private boolean isLeader() { - return (leaderMonitor == null) || leaderMonitor.isLeader(); - } - - @VisibleForTesting - String getAuthServerName() { - return notificationProcessor.getAuthServerName(); - } - - /** - * Processes new Hive Metastore notifications. - * - * <p>If no notifications are processed yet, then it - * does a full initial snapshot of the Hive Metastore followed by new notifications updates that - * could have happened after it. - * - * <p>Clients connections waiting for an event notification will be - * woken up afterwards. - */ - void syncupWithHms(long notificationId) { - try { - client.connect(); - connectedToHms = true; - SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED); - } catch (Throwable e) { - LOGGER.error("HMSFollower cannot connect to HMS!!", e); - return; - } - - try { - if (hdfsSyncEnabled) { - // Before getting notifications, checking if a full HMS snapshot is required. - if (isFullSnapshotRequired(notificationId)) { - createFullSnapshot(); - return; - } - } else if (isSentryOutOfSync(notificationId)) { - // Out-of-sync, fetching all the notifications - // in HMS NOTIFICATION_LOG. - sentryStore.setLastProcessedNotificationID(0L); - notificationId = 0L; - } - - Collection<NotificationEvent> notifications = - notificationFetcher.fetchNotifications(notificationId); - - // After getting notifications, check if HMS did some clean-up and notifications - // are out-of-sync with Sentry. - if (hdfsSyncEnabled && - areNotificationsOutOfSync(notifications, notificationId)) { - // Out-of-sync, taking a HMS full snapshot. - createFullSnapshot(); - return; - } - - if (!readyToServe) { - // Allow users and/or applications who look into the Sentry console output to see - // when Sentry is ready to serve. - System.out.println("Sentry HMS support is ready"); - readyToServe = true; - } - - // Continue with processing new notifications if no snapshots are done. - processNotifications(notifications); - } catch (TException e) { - LOGGER.error("An error occurred while fetching HMS notifications: ", e); - close(); - } catch (Throwable t) { - // catching errors to prevent the executor to halt. - LOGGER.error("Exception in HMSFollower! Caused by: " + t.getMessage(), t); - - close(); - } - } - - /** - * Checks if a new full HMS snapshot request is needed by checking if: - * <ul> - * <li>Sentry HMS Notification table is EMPTY</li> - * <li>HDFSSync is enabled and Sentry Authz Snapshot table is EMPTY</li> - * <li>The current notification Id on the HMS is less than the - * latest processed by Sentry.</li> - * <li>Full Snapshot Signal is detected</li> - * </ul> - * - * @param latestSentryNotificationId The notification Id to check against the HMS - * @return True if a full snapshot is required; False otherwise. - * @throws Exception If an error occurs while checking the SentryStore or the HMS client. - */ - private boolean isFullSnapshotRequired(long latestSentryNotificationId) throws Exception { - if (sentryStore.isHmsNotificationEmpty()) { - LOGGER.debug("Sentry Store has no HMS Notifications. Create Full HMS Snapshot. " - + "latest sentry notification Id = {}", latestSentryNotificationId); - return true; - } - - // Once HDFS sync is enabled, and if MAuthzPathsMapping - // table is still empty, we need to request a full snapshot - if (sentryStore.isAuthzPathsSnapshotEmpty()) { - LOGGER.debug("HDFSSync is enabled and MAuthzPathsMapping table is empty." + - " Need to request a full snapshot"); - return true; - } - - if(isSentryOutOfSync(latestSentryNotificationId)) { - return true; - } - - // check if forced full update is required, reset update flag to false - // to only do it once per forced full update request. - if (fullUpdateHMS.compareAndSet(true, false)) { - LOGGER.info(FULL_UPDATE_TRIGGER + "initiating full HMS snapshot request"); - return true; - } - - return false; - } - - /** - * Checks the last notification processed by sentry and the current event-id of - * HMS to see if sentry is out of sync. - * - * @param latestSentryNotificationId The notification Id to check against the HMS - * @return True, sentry is out-of-sync, False otherwise - * @throws Exception If an error occurs while fetching the current notification from HMS - */ - private boolean isSentryOutOfSync(long latestSentryNotificationId) throws Exception { - long currentHmsNotificationId = notificationFetcher.getCurrentNotificationId(); - if (currentHmsNotificationId < latestSentryNotificationId) { - LOGGER.info("The current notification ID on HMS = {} is less than the latest processed Sentry " - + "notification ID = {}. Sentry, Out-of-sync", - currentHmsNotificationId, latestSentryNotificationId); - return true; - } - return false; - } - - /** - * Checks if the HMS and Sentry processed notifications are out-of-sync. - * This could happen because the HMS did some clean-up of old notifications - * and Sentry was not requesting notifications during that time. - * - * @param events All new notifications to check for an out-of-sync. - * @param latestProcessedId The latest notification processed by Sentry to check against the - * list of notifications events. - * @return True if an out-of-sync is found; False otherwise. - */ - private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events, - long latestProcessedId) { - if (events.isEmpty()) { - return false; - } - - /* - * If the sequence of notifications has a gap, then an out-of-sync might - * have happened due to the following issue: - * - * - HDFS sync was disabled or Sentry was shutdown for a time period longer than - * the HMS notification clean-up thread causing old notifications to be deleted. - * - * HMS notifications may contain both gaps in the sequence and duplicates - * (the same ID repeated more then once for different events). - * - * To accept duplicates (see NotificationFetcher for more info), then a gap is found - * if the 1st notification received is higher than the current ID processed + 1. - * i.e. - * 1st ID = 3, latest ID = 3 (duplicate found but no gap detected) - * 1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected) - * 1st ID = 5, latest ID = 3 (a gap is detected) - */ - - List<NotificationEvent> eventList = (List<NotificationEvent>) events; - long firstNotificationId = eventList.get(0).getEventId(); - - if (firstNotificationId > (latestProcessedId + 1)) { - LOGGER.info("First HMS event notification Id = {} is greater than latest Sentry processed" - + "notification Id = {} + 1. Need to request a full HMS snapshot.", firstNotificationId, latestProcessedId); - return true; - } - - return false; - } - - /** - * Request for full snapshot and persists it if there is no snapshot available in the sentry - * store. Also, wakes-up any waiting clients. - * - * @return ID of last notification processed. - * @throws Exception if there are failures - */ - private long createFullSnapshot() throws Exception { - LOGGER.debug("Attempting to take full HMS snapshot"); - Preconditions.checkState(!SentryStateBank.isEnabled(SentryServiceState.COMPONENT, - SentryServiceState.FULL_UPDATE_RUNNING), - "HMSFollower shown loading full snapshot when it should not be."); - try { - // Set that the full update is running - SentryStateBank - .enableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING); - - PathsImage snapshotInfo = client.getFullSnapshot(); - if (snapshotInfo.getPathImage().isEmpty()) { - LOGGER.debug("Received empty path image from HMS while taking a full snapshot"); - return snapshotInfo.getId(); - } - - // Check we're still the leader before persisting the new snapshot - if (!isLeader()) { - LOGGER.info("Not persisting full snapshot since not a leader"); - return SentryStore.EMPTY_NOTIFICATION_ID; - } - try { - if (hdfsSyncEnabled) { - LOGGER.info("Persisting full snapshot for notification Id = {}", snapshotInfo.getId()); - sentryStore.persistFullPathsImage(snapshotInfo.getPathImage(), snapshotInfo.getId()); - } else { - // We need to persist latest notificationID for next poll - LOGGER.info("HDFSSync is disabled. Not Persisting full snapshot, " - + "but only setting last processed notification Id = {}", snapshotInfo.getId()); - sentryStore.setLastProcessedNotificationID(snapshotInfo.getId()); - } - } catch (Exception failure) { - LOGGER.error("Received exception while persisting HMS path full snapshot "); - throw failure; - } - // Wake up any HMS waiters that could have been put on hold before getting the - // eventIDBefore value. - wakeUpWaitingClientsForSync(snapshotInfo.getId()); - // HMSFollower connected to HMS and it finished full snapshot if that was required - // Log this message only once - LOGGER.info("Sentry HMS support is ready"); - return snapshotInfo.getId(); - } catch(Exception failure) { - LOGGER.error("Received exception while creating HMS path full snapshot "); - throw failure; - } finally { - SentryStateBank - .disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING); - } - } - - /** - * Process the collection of notifications and wake up any waiting clients. - * Also, persists the notification ID regardless of processing result. - * - * @param events list of event to be processed - * @throws Exception if the complete notification list is not processed because of JDO Exception - */ - public void processNotifications(Collection<NotificationEvent> events) throws Exception { - boolean isNotificationProcessed; - if (events.isEmpty()) { - return; - } - - for (NotificationEvent event : events) { - isNotificationProcessed = false; - try { - // Only the leader should process the notifications - if (!isLeader()) { - LOGGER.debug("Not processing notifications since not a leader"); - return; - } - isNotificationProcessed = notificationProcessor.processNotificationEvent(event); - } catch (Exception e) { - if (e.getCause() instanceof JDODataStoreException) { - LOGGER.info("Received JDO Storage Exception, Could be because of processing " - + "duplicate notification"); - if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) { - // Rest of the notifications need not be processed. - LOGGER.error("Received event with Id: {} which is smaller then the ID " - + "persisted in store", event.getEventId()); - break; - } - } else { - LOGGER.error("Processing the notification with ID:{} failed with exception {}", - event.getEventId(), e); - } - } - if (!isNotificationProcessed) { - try { - // Update the notification ID in the persistent store even when the notification is - // not processed as the content in in the notification is not valid. - // Continue processing the next notification. - LOGGER.debug("Explicitly Persisting Notification ID = {} ", event.getEventId()); - sentryStore.persistLastProcessedNotificationID(event.getEventId()); - } catch (Exception failure) { - LOGGER.error("Received exception while persisting the notification ID = {}", event.getEventId()); - throw failure; - } - } - // Wake up any HMS waiters that are waiting for this ID. - wakeUpWaitingClientsForSync(event.getEventId()); - } - } - - /** - * Wakes up HMS waiters waiting for a specific event notification.<p> - * - * Verify that HMS image id didn't change since the last time we looked. - * If id did, it is possible that notifications jumped backward, so reset - * the counter to the current value. - * - * @param eventId Id of a notification - */ - private void wakeUpWaitingClientsForSync(long eventId) { - CounterWait counterWait = sentryStore.getCounterWait(); - - LOGGER.debug("wakeUpWaitingClientsForSync: eventId = {}, hmsImageId = {}", eventId, hmsImageId); - // Wake up any HMS waiters that are waiting for this ID. - // counterWait should never be null, but tests mock SentryStore and a mocked one - // doesn't have it. - if (counterWait == null) { - return; - } - - long lastHMSSnapshotId = hmsImageId; - try { - // Read actual HMS image ID - lastHMSSnapshotId = sentryStore.getLastProcessedImageID(); - LOGGER.debug("wakeUpWaitingClientsForSync: lastHMSSnapshotId = {}", lastHMSSnapshotId); - } catch (Exception e) { - counterWait.update(eventId); - LOGGER.error("Failed to get the last processed HMS image id from sentry store"); - return; - } - - // Reset the counter if the persisted image ID is greater than current image ID - if (lastHMSSnapshotId > hmsImageId) { - counterWait.reset(eventId); - hmsImageId = lastHMSSnapshotId; - LOGGER.debug("wakeUpWaitingClientsForSync: reset counterWait with eventId = {}, new hmsImageId = {}", eventId, hmsImageId); - } - - LOGGER.debug("wakeUpWaitingClientsForSync: update counterWait with eventId = {}, hmsImageId = {}", eventId, hmsImageId); - counterWait.update(eventId); - } - - /** - * PubSub.Subscriber callback API - */ - @Override - public void onMessage(PubSub.Topic topic, String message) { - Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, "Unexpected topic %s instead of %s", topic, PubSub.Topic.HDFS_SYNC_HMS); - LOGGER.info(FULL_UPDATE_TRIGGER + "Received [{}, {}] notification", topic, message); - fullUpdateHMS.set(true); - } -}
http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java deleted file mode 100644 index c2f1ad0..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.provider.db.service.persistent; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderSelector; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; -import org.apache.hadoop.conf.Configuration; - -import javax.annotation.concurrent.ThreadSafe; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.sentry.service.common.ServiceConstants.ServerConfig.*; - -/** - * LeaderStatusMonitor participates in the distributed leader election protocol - * and allows clients to access the global leaadership status. - * <p> - * LeaderStatusMonitor is a singleton that uses Curator framework via - * {@link HAContext}.The leadership status can be accessed via the - * {@link #isLeader()} method.<p> - * - * Usually leadership re-election is initiated by the Curator framework when one - * of the nodes disconnects from ZooKeeper, but LeaderStatusMonitor also supports - * voluntary release of the leadership via the {@link #deactivate()} method. This is - * intended to be used for debugging purposes. - * <p> - * The class also simulates leader election in non-HA environments. In such cases its - * {@link #isLeader()} method always returns True. The non-HA environment is determined - * by the absence of the SENTRY_HA_ZOOKEEPER_QUORUM in the configuration. - * - * <h2>Implementation notes</h2> - * - * <h3>Initialization</h3> - * - * Class initialization is split between the constructor and the {@link #init()} method. - * There are two reasons for it: - * <ul> - * <li>We do not want to pass <strong>this</strong> reference to - * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)} - * until it is fully initialized</li> - * <li>We do not want to call {@link LeaderSelector#start()} method in constructor</li> - * </ul> - * - * Since LeaderStatusMonitor is a singleton and an instance can only be obtained via the - * {@link #getLeaderStatusMonitor(Configuration)} method, we hide this construction split - * from the callers. - * - * <h3>Synchronization</h3> - * Singleton synchronization is achieved using the synchronized class builder - * {@link #getLeaderStatusMonitor(Configuration)} - * <p> - * Upon becoming a leader, the code loops in {@link #takeLeadership(CuratorFramework)} - * until it receives a deactivation signal from {@link #deactivate()}. This is synchronized - * using a {@link #lock} and condition variable {@link #cond}. - * <p> - * Access to the leadership status {@link #isLeader} is also protected by the {@link #lock}. - * This isn't strictly necessary and a volatile field would be sufficient, but since we - * already use the {@link #lock} this is more straightforward. - */ -@ThreadSafe -public final class LeaderStatusMonitor - extends LeaderSelectorListenerAdapter implements AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(LeaderStatusMonitor.class); - - private static final String LEADER_SELECTOR_SUFFIX = "leader"; - - /** Unique instance of the singleton object */ - private static LeaderStatusMonitor leaderStatusMonitor = null; - - private final HAContext haContext; - - /** Unique string describing this instance */ - private final String defaultIncarnationId = generateIncarnationId(); - private String incarnationId; - - /** True when not using ZooKeeeper */ - private final boolean isSingleNodeMode; - - /** Lock and condition used to signal the leader to voluntary release leadership */ - private final Lock lock = new ReentrantLock(); - /** Condition variable used to synchronize voluntary leadership release */ - private final Condition cond = lock.newCondition(); - /** Leadership status - true if leader. */ - private boolean isLeader = false; - - /** Curator framework leader monitor */ - private LeaderSelector leaderSelector = null; - - /** The number of times this incarnation has become the leader. */ - private final AtomicLong leaderCount = new AtomicLong(0); - - /** - * Constructor. Initialize state and create HA context if configuration - * specifies ZooKeeper servers. - * @param conf Configuration. The fields we are interested in are: - * <ul> - * <li>SENTRY_HA_ZOOKEEPER_QUORUM</li> - * </ul> - * Configuration is also passed to the - * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)} - * which uses more properties. - * @throws Exception - */ - - @VisibleForTesting - protected LeaderStatusMonitor(Configuration conf) throws Exception { - // Only enable HA configuration if zookeeper is configured - String zkServers = conf.get(SENTRY_HA_ZOOKEEPER_QUORUM, ""); - if (zkServers.isEmpty()) { - isSingleNodeMode = true; - haContext = null; - isLeader = true; - incarnationId = ""; - LOGGER.info("Leader election protocol disabled, assuming single active server"); - return; - } - isSingleNodeMode = false; - incarnationId = defaultIncarnationId; - haContext = HAContext.getHAServerContext(conf); - - LOGGER.info("Created LeaderStatusMonitor(incarnationId={}, " - + "zkServers='{}')", incarnationId, zkServers); - } - - /** - * Tests may need to provide custm incarnation ID - * @param conf confguration - * @param incarnationId custom incarnation ID - * @throws Exception - */ - @VisibleForTesting - protected LeaderStatusMonitor(Configuration conf, String incarnationId) throws Exception { - this(conf); - this.incarnationId = incarnationId; - } - - /** - * Second half of the constructor links this object with {@link HAContext} and - * starts leader election protocol. - */ - @VisibleForTesting - protected void init() { - if (isSingleNodeMode) { - return; - } - - leaderSelector = haContext.newLeaderSelector("/" + LEADER_SELECTOR_SUFFIX, this); - leaderSelector.setId(incarnationId); - leaderSelector.autoRequeue(); - leaderSelector.start(); - } - - /** - * - * @param conf Configuration. See {@link #LeaderStatusMonitor(Configuration)} for details. - * @return A global LeaderStatusMonitor instance. - * @throws Exception - */ - @SuppressWarnings("LawOfDemeter") - public static synchronized LeaderStatusMonitor getLeaderStatusMonitor(Configuration conf) - throws Exception { - if (leaderStatusMonitor == null) { - leaderStatusMonitor = new LeaderStatusMonitor(conf); - leaderStatusMonitor.init(); - } - return leaderStatusMonitor; - } - - /** - * @return number of times this leader was elected. Used for metrics. - */ - public long getLeaderCount() { - return leaderCount.get(); - } - - /** - * Shut down the LeaderStatusMonitor and wait for it to transition to - * standby. - */ - @Override - public void close() { - if (leaderSelector != null) { - // Shut down our Curator hooks. - leaderSelector.close(); - } - } - - /** - * Deactivate the current client, if it is active. - * In non-HA case this is a no-op. - */ - public void deactivate() { - if (isSingleNodeMode) { - return; - } - lock.lock(); - try { - cond.signal(); - } finally { - lock.unlock(); - } - } - - /** - * @return true iff we are the leader. - * In non-HA case always returns true - */ - public boolean isLeader() { - if (isSingleNodeMode) { - return true; - } - lock.lock(); - @SuppressWarnings("FieldAccessNotGuarded") - boolean leader = isLeader; - lock.unlock(); - return leader; - } - - /** - * Curator framework callback which is called when we become a leader. - * Should return only when we decide to resign. - */ - @Override - public void takeLeadership(CuratorFramework client) throws Exception { - leaderCount.incrementAndGet(); - LOGGER.info("Becoming leader in Sentry HA cluster:{}", this); - lock.lock(); - try { - isLeader = true; - // Wait until we are interrupted or receive a signal - cond.await(); - } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - LOGGER.error("takeLeadership call interrupted:" + this, ignored); - } finally { - isLeader = false; - lock.unlock(); - LOGGER.info("Resigning from leader status in a Sentry HA cluster:{}", this); - } - } - - /** - * Generate ID for the activator. <p> - * - * Ideally we would like something like host@pid, but Java doesn't provide a good - * way to determine pid value, so we use - * {@link RuntimeMXBean#getName()} which usually contains host - * name and pid. - */ - private static String generateIncarnationId() { - return ManagementFactory.getRuntimeMXBean().getName(); - } - - @Override - public String toString() { - return isSingleNodeMode?"Leader election disabled": - String.format("{isSingleNodeMode=%b, incarnationId=%s, isLeader=%b, leaderCount=%d}", - isSingleNodeMode, incarnationId, isLeader, leaderCount.longValue()); - } - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java deleted file mode 100644 index a771f4b..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java +++ /dev/null @@ -1,777 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; -import com.codahale.metrics.Timer.Context; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage; -import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer; -import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; -import org.apache.sentry.core.common.exception.SentryInvalidInputException; -import org.apache.sentry.core.common.exception.SentryNoSuchObjectException; -import org.apache.sentry.core.common.utils.PathUtils; -import org.apache.sentry.hdfs.PathsUpdate; -import org.apache.sentry.hdfs.PermissionsUpdate; -import org.apache.sentry.hdfs.SentryMalformedPathException; -import org.apache.sentry.hdfs.UniquePathsUpdate; -import org.apache.sentry.hdfs.Updateable.Update; -import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; -import org.apache.sentry.api.service.thrift.SentryMetrics; -import org.apache.sentry.api.service.thrift.TSentryAuthorizable; -import org.apache.sentry.api.common.SentryServiceUtil; -import org.apache.sentry.hdfs.service.thrift.TPrivilegeEntityType; -import org.apache.sentry.hdfs.service.thrift.TPrivilegeEntity; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; - -import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; -import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE; - - - -/** - * NotificationProcessor processes various notification events generated from - * the Hive MetaStore state change, and applies these changes to the complete - * HMS Paths snapshot or delta update stored in Sentry using SentryStore. - * - * <p>NotificationProcessor should not skip processing notification events for any reason. - * If some notification events are to be skipped, appropriate logic should be added in - * HMSFollower before invoking NotificationProcessor. - */ -final class NotificationProcessor { - - private static final Logger LOGGER = LoggerFactory.getLogger(NotificationProcessor.class); - private final SentryStore sentryStore; - private final SentryJSONMessageDeserializer deserializer; - private final String authServerName; - // These variables can be updated even after object is instantiated, for testing purposes. - private boolean syncStoreOnCreate = false; - private boolean syncStoreOnDrop = false; - private final boolean hdfsSyncEnabled; - - /** - * Configuring notification processor. - * - * @param sentryStore sentry backend store - * @param authServerName Server that sentry is authorizing - * @param conf sentry configuration - */ - NotificationProcessor(SentryStore sentryStore, String authServerName, - Configuration conf) { - this.sentryStore = sentryStore; - deserializer = new SentryJSONMessageDeserializer(); - this.authServerName = authServerName; - syncStoreOnCreate = Boolean - .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(), - AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault())); - syncStoreOnDrop = Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(), - AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault())); - hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(conf); - } - - /** - * Split path into components on the "/" character. - * The path should not start with "/". - * This is consumed by Thrift interface, so the return result should be - * {@code List<String>} - * - * @param path input oath e.g. {@code foo/bar} - * @return list of components, e.g. [foo, bar] - */ - private static List<String> splitPath(String path) { - return Lists.newArrayList(PathUtils.splitPath(path)); - } - - /** - * Constructs permission update to be persisted for drop event that can be persisted - * from thrift object. - * - * @param authorizable thrift object that is dropped. - * @return update to be persisted - * @throws SentryInvalidInputException if the required fields are set in argument provided - */ - @VisibleForTesting - static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable) - throws SentryInvalidInputException { - PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); - String authzObj = SentryServiceUtil.getAuthzObj(authorizable); - update.addPrivilegeUpdate(authzObj) - .putToDelPrivileges(new TPrivilegeEntity(TPrivilegeEntityType.ROLE, PermissionsUpdate.ALL_ROLES), - PermissionsUpdate.ALL_ROLES); - return update; - } - - @VisibleForTesting - String getAuthServerName() { - return authServerName; - } - - /** - * Constructs permission update to be persisted for rename event that can be persisted from thrift - * object. - * - * @param oldAuthorizable old thrift object - * @param newAuthorizable new thrift object - * @return update to be persisted - * @throws SentryInvalidInputException if the required fields are set in arguments provided - */ - @VisibleForTesting - static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable, - TSentryAuthorizable newAuthorizable) - throws SentryInvalidInputException { - String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable); - String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable); - PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); - TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); - privUpdate.putToAddPrivileges(new TPrivilegeEntity(TPrivilegeEntityType.AUTHZ_OBJ, newAuthz), newAuthz); - privUpdate.putToDelPrivileges(new TPrivilegeEntity(TPrivilegeEntityType.AUTHZ_OBJ, oldAuthz), oldAuthz); - return update; - } - - /** - * This function is only used for testing purposes. - * - * @param value to be set - */ - @SuppressWarnings("SameParameterValue") - @VisibleForTesting - void setSyncStoreOnCreate(boolean value) { - syncStoreOnCreate = value; - } - - /** - * This function is only used for testing purposes. - * - * @param value to be set - */ - @SuppressWarnings("SameParameterValue") - @VisibleForTesting - void setSyncStoreOnDrop(boolean value) { - syncStoreOnDrop = value; - } - - /** - * Processes the event and persist to sentry store. - * - * @param event to be processed - * @return true, if the event is persisted to sentry store. false, if the event is not persisted. - * @throws Exception if there is an error processing the event. - */ - boolean processNotificationEvent(NotificationEvent event) throws Exception { - LOGGER - .debug("Processing event with id:{} and Type:{}", event.getEventId(), event.getEventType()); - - // Expose time used for each request time as a metric. - // We use lower-case version of the event name. - EventType eventType = EventType.valueOf(event.getEventType()); - Timer timer = SentryMetrics - .getInstance() - .getTimer(MetricRegistry.name(HMSFollower.class, eventType.toString().toLowerCase())); - - try (Context ignored = timer.time()) { - switch (eventType) { - case CREATE_DATABASE: - return processCreateDatabase(event); - case DROP_DATABASE: - return processDropDatabase(event); - case CREATE_TABLE: - return processCreateTable(event); - case DROP_TABLE: - return processDropTable(event); - case ALTER_TABLE: - return processAlterTable(event); - case ADD_PARTITION: - return processAddPartition(event); - case DROP_PARTITION: - return processDropPartition(event); - case ALTER_PARTITION: - return processAlterPartition(event); - default: - LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(), - event.getEventType()); - return false; - } - } - } - - /** - * Processes "create database" notification event, and applies its corresponding - * snapshot change as well as delta path update into Sentry DB. - * - * @param event notification event to be processed. - * @throws Exception if encounters errors while persisting the path change - */ - private boolean processCreateDatabase(NotificationEvent event) throws Exception { - SentryJSONCreateDatabaseMessage message = - deserializer.getCreateDatabaseMessage(event.getMessage()); - String dbName = message.getDB(); - String location = message.getLocation(); - if ((dbName == null) || (location == null)) { - LOGGER.warn("Create database event " - + "has incomplete information. dbName: {} location: {}", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(location, "null")); - return false; - } - - if (syncStoreOnCreate) { - dropSentryDbPrivileges(dbName, event); - } - - if (hdfsSyncEnabled) { - List<String> locations = Collections.singletonList(location); - addPaths(dbName, locations, event); - - return true; - } - - return false; - } - - /** - * Processes "drop database" notification event, and applies its corresponding - * snapshot change as well as delta path update into Sentry DB. - * - * @param event notification event to be processed. - * @throws Exception if encounters errors while persisting the path change - */ - private boolean processDropDatabase(NotificationEvent event) throws Exception { - SentryJSONDropDatabaseMessage dropDatabaseMessage = - deserializer.getDropDatabaseMessage(event.getMessage()); - String dbName = dropDatabaseMessage.getDB(); - String location = dropDatabaseMessage.getLocation(); - if (dbName == null) { - LOGGER.warn("Drop database event has incomplete information: dbName = null"); - return false; - } - if (syncStoreOnDrop) { - dropSentryDbPrivileges(dbName, event); - } - - if (hdfsSyncEnabled) { - List<String> locations = Collections.singletonList(location); - removePaths(dbName, locations, event); - return true; - } - return false; - } - - /** - * Processes "create table" notification event, and applies its corresponding - * snapshot change as well as delta path update into Sentry DB. - * - * @param event notification event to be processed. - * @throws Exception if encounters errors while persisting the path change - */ - private boolean processCreateTable(NotificationEvent event) - throws Exception { - SentryJSONCreateTableMessage createTableMessage = deserializer - .getCreateTableMessage(event.getMessage()); - String dbName = createTableMessage.getDB(); - String tableName = createTableMessage.getTable(); - String location = createTableMessage.getLocation(); - if ((dbName == null) || (tableName == null) || (location == null)) { - LOGGER.warn(String.format("Create table event " + "has incomplete information." - + " dbName = %s, tableName = %s, location = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - StringUtils.defaultIfBlank(location, "null"))); - return false; - } - if (syncStoreOnCreate) { - dropSentryTablePrivileges(dbName, tableName, event); - } - - if (hdfsSyncEnabled) { - String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); - List<String> locations = Collections.singletonList(location); - addPaths(authzObj, locations, event); - return true; - } - - return false; - } - - /** - * Processes "drop table" notification event. It drops all partitions belongs to - * the table as well. And applies its corresponding snapshot change as well - * as delta path update into Sentry DB. - * - * @param event notification event to be processed. - * @throws Exception if encounters errors while persisting the path change - */ - private boolean processDropTable(NotificationEvent event) throws Exception { - SentryJSONDropTableMessage dropTableMessage = deserializer - .getDropTableMessage(event.getMessage()); - String dbName = dropTableMessage.getDB(); - String tableName = dropTableMessage.getTable(); - if ((dbName == null) || (tableName == null)) { - LOGGER.warn("Drop table event " - + "has incomplete information. dbName: {}, tableName: {}", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null")); - return false; - } - if (syncStoreOnDrop) { - dropSentryTablePrivileges(dbName, tableName, event); - } - - if (hdfsSyncEnabled) { - String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); - removeAllPaths(authzObj, event); - return true; - } - - return false; - } - - /** - * Processes "alter table" notification event, and applies its corresponding - * snapshot change as well as delta path update into Sentry DB. - * - * @param event notification event to be processed. - * @throws Exception if encounters errors while persisting the path change - */ - private boolean processAlterTable(NotificationEvent event) throws Exception { - - SentryJSONAlterTableMessage alterTableMessage = - deserializer.getAlterTableMessage(event.getMessage()); - String oldDbName = alterTableMessage.getDB(); - String oldTableName = alterTableMessage.getTable(); - String newDbName = event.getDbName(); - String newTableName = event.getTableName(); - String oldLocation = alterTableMessage.getOldLocation(); - String newLocation = alterTableMessage.getNewLocation(); - - if ((oldDbName == null) - || (oldTableName == null) - || (newDbName == null) - || (newTableName == null) - || (oldLocation == null) - || (newLocation == null)) { - LOGGER.warn(String.format("Alter table notification ignored since event " - + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " - + "newDbName = %s, newTableName = %s, newLocation = %s", - StringUtils.defaultIfBlank(oldDbName, "null"), - StringUtils.defaultIfBlank(oldTableName, "null"), - StringUtils.defaultIfBlank(oldLocation, "null"), - StringUtils.defaultIfBlank(newDbName, "null"), - StringUtils.defaultIfBlank(newTableName, "null"), - StringUtils.defaultIfBlank(newLocation, "null"))); - return false; - } - - if ((oldDbName.equals(newDbName)) - && (oldTableName.equals(newTableName)) - && (oldLocation.equals(newLocation))) { - LOGGER.debug(String.format("Alter table notification ignored as neither name nor " - + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " - + "newLocation = %s", oldDbName + "." + oldTableName, oldLocation, - newDbName + "." + newTableName, newLocation)); - return false; - } - - if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { - // Name has changed - try { - renamePrivileges(oldDbName, oldTableName, newDbName, newTableName); - } catch (SentryNoSuchObjectException e) { - LOGGER.debug("Rename Sentry privilege ignored as there are no privileges on the table:" - + " {}.{}", oldDbName, oldTableName); - } catch (Exception e) { - LOGGER.info("Could not process Alter table event. Event: {}", event.toString(), e); - return false; - } - } - - if (!hdfsSyncEnabled) { - return false; - } - String oldAuthzObj = oldDbName + "." + oldTableName; - String newAuthzObj = newDbName + "." + newTableName; - renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, event); - return true; - } - - /** - * Processes "add partition" notification event, and applies its corresponding - * snapshot change as well as delta path update into Sentry DB. - * - * @param event notification event to be processed. - * @throws Exception if encounters errors while persisting the path change - */ - private boolean processAddPartition(NotificationEvent event) - throws Exception { - if (!hdfsSyncEnabled) { - return false; - } - - SentryJSONAddPartitionMessage addPartitionMessage = - deserializer.getAddPartitionMessage(event.getMessage()); - String dbName = addPartitionMessage.getDB(); - String tableName = addPartitionMessage.getTable(); - List<String> locations = addPartitionMessage.getLocations(); - if ((dbName == null) || (tableName == null) || (locations == null)) { - LOGGER.warn(String.format("Create table event has incomplete information. " - + "dbName = %s, tableName = %s, locations = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - locations != null ? locations.toString() : "null")); - return false; - } - String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); - addPaths(authzObj, locations, event); - return true; - } - - /** - * Processes "drop partition" notification event, and applies its corresponding - * snapshot change as well as delta path update into Sentry DB. - * - * @param event notification event to be processed. - * @throws Exception if encounters errors while persisting the path change - */ - private boolean processDropPartition(NotificationEvent event) - throws Exception { - if (!hdfsSyncEnabled) { - return false; - } - - SentryJSONDropPartitionMessage dropPartitionMessage = - deserializer.getDropPartitionMessage(event.getMessage()); - String dbName = dropPartitionMessage.getDB(); - String tableName = dropPartitionMessage.getTable(); - List<String> locations = dropPartitionMessage.getLocations(); - if ((dbName == null) || (tableName == null) || (locations == null)) { - LOGGER.warn(String.format("Drop partition event " - + "has incomplete information. dbName = %s, tableName = %s, location = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - locations != null ? locations.toString() : "null")); - return false; - } - String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); - removePaths(authzObj, locations, event); - return true; - } - - /** - * Processes "alter partition" notification event, and applies its corresponding - * snapshot change as well as delta path update into Sentry DB. - * - * @param event notification event to be processed. - * @throws Exception if encounters errors while persisting the path change - */ - private boolean processAlterPartition(NotificationEvent event) throws Exception { - if (!hdfsSyncEnabled) { - return false; - } - - SentryJSONAlterPartitionMessage alterPartitionMessage = - deserializer.getAlterPartitionMessage(event.getMessage()); - String dbName = alterPartitionMessage.getDB(); - String tableName = alterPartitionMessage.getTable(); - String oldLocation = alterPartitionMessage.getOldLocation(); - String newLocation = alterPartitionMessage.getNewLocation(); - - if ((dbName == null) - || (tableName == null) - || (oldLocation == null) - || (newLocation == null)) { - LOGGER.warn(String.format("Alter partition event " - + "has incomplete information. dbName = %s, tableName = %s, " - + "oldLocation = %s, newLocation = %s", - StringUtils.defaultIfBlank(dbName, "null"), - StringUtils.defaultIfBlank(tableName, "null"), - StringUtils.defaultIfBlank(oldLocation, "null"), - StringUtils.defaultIfBlank(newLocation, "null"))); - return false; - } - - if (oldLocation.equals(newLocation)) { - LOGGER.debug(String.format("Alter partition notification ignored as" - + "location has not changed: AuthzObj = %s, Location = %s", dbName + "." - + "." + tableName, oldLocation)); - return false; - } - - String oldAuthzObj = dbName + "." + tableName; - renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, event); - return true; - } - - /** - * Adds an authzObj along with a set of paths into the authzObj -> [Paths] mapping - * as well as persist the corresponding delta path change to Sentry DB. - * - * @param authzObj the given authzObj - * @param locations a set of paths need to be added - * @param event the NotificationEvent object from where authzObj and locations were obtained - */ - private void addPaths(String authzObj, Collection<String> locations, NotificationEvent event) - throws Exception { - // AuthzObj is case insensitive - authzObj = authzObj.toLowerCase(); - - UniquePathsUpdate update = new UniquePathsUpdate(event, false); - Collection<String> paths = new HashSet<>(locations.size()); - // addPath and persist into Sentry DB. - // Skip update if encounter malformed path. - for (String location : locations) { - String pathTree = getPath(location); - if (pathTree == null) { - LOGGER.debug("HMS Path Update [" - + "OP : addPath, " - + "authzObj : " + authzObj + ", " - + "path : " + location + "] - nothing to add" + ", " - + "notification event ID: " + event.getEventId() + "]"); - } else { - LOGGER.debug("HMS Path Update [" - + "OP : addPath, " + "authzObj : " - + authzObj + ", " - + "path : " + location + ", " - + "notification event ID: " + event.getEventId() + "]"); - update.newPathChange(authzObj).addToAddPaths(splitPath(pathTree)); - paths.add(pathTree); - } - } - sentryStore.addAuthzPathsMapping(authzObj, paths, update); - } - - /** - * Removes a set of paths map to a given authzObj from the authzObj -> [Paths] mapping - * as well as persist the corresponding delta path change to Sentry DB. - * - * @param authzObj the given authzObj - * @param locations a set of paths need to be removed - * @param event the NotificationEvent object from where authzObj and locations were obtained - */ - private void removePaths(String authzObj, Collection<String> locations, NotificationEvent event) - throws Exception { - // AuthzObj is case insensitive - authzObj = authzObj.toLowerCase(); - - UniquePathsUpdate update = new UniquePathsUpdate(event, false); - Collection<String> paths = new HashSet<>(locations.size()); - for (String location : locations) { - String pathTree = getPath(location); - if (pathTree == null) { - LOGGER.debug("HMS Path Update [" - + "OP : removePath, " - + "authzObj : " + authzObj + ", " - + "path : " + location + "] - nothing to remove" + ", " - + "notification event ID: " + event.getEventId() + "]"); - } else { - LOGGER.debug("HMS Path Update [" - + "OP : removePath, " - + "authzObj : " + authzObj + ", " - + "path : " + location + ", " - + "notification event ID: " + event.getEventId() + "]"); - update.newPathChange(authzObj).addToDelPaths(splitPath(pathTree)); - paths.add(pathTree); - } - } - sentryStore.deleteAuthzPathsMapping(authzObj, paths, update); - } - - /** - * Removes a given authzObj and all paths belongs to it from the - * authzObj -> [Paths] mapping as well as persist the corresponding - * delta path change to Sentry DB. - * - * @param authzObj the given authzObj to be deleted - * @param event the NotificationEvent object from where authzObj and locations were obtained - */ - private void removeAllPaths(String authzObj, NotificationEvent event) - throws Exception { - // AuthzObj is case insensitive - authzObj = authzObj.toLowerCase(); - - LOGGER.debug("HMS Path Update [" - + "OP : removeAllPaths, " - + "authzObj : " + authzObj + ", " - + "notification event ID: " + event.getEventId() + "]"); - UniquePathsUpdate update = new UniquePathsUpdate(event, false); - update.newPathChange(authzObj).addToDelPaths( - Lists.newArrayList(PathsUpdate.ALL_PATHS)); - sentryStore.deleteAllAuthzPathsMapping(authzObj, update); - } - - /** - * Renames a given authzObj and alter the paths belongs to it from the - * authzObj -> [Paths] mapping as well as persist the corresponding - * delta path change to Sentry DB. - * - * @param oldAuthzObj the existing authzObj - * @param newAuthzObj the new name to be changed to - * @param oldLocation a existing path of the given authzObj - * @param newLocation a new path to be changed to - * @param event the NotificationEvent object from where authzObj and locations were obtained - */ - private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String oldLocation, - String newLocation, NotificationEvent event) throws Exception { - // AuthzObj is case insensitive - oldAuthzObj = oldAuthzObj.toLowerCase(); - newAuthzObj = newAuthzObj.toLowerCase(); - String oldPathTree = getPath(oldLocation); - String newPathTree = getPath(newLocation); - - LOGGER.debug("HMS Path Update [" - + "OP : renameAuthzObject, " - + "oldAuthzObj : " + oldAuthzObj + ", " - + "newAuthzObj : " + newAuthzObj + ", " - + "oldLocation : " + oldLocation + ", " - + "newLocation : " + newLocation + ", " - + "notification event ID: " + event.getEventId() + "]"); - - // In the case of HiveObj name has changed - if (!oldAuthzObj.equalsIgnoreCase(newAuthzObj)) { - // Skip update if encounter malformed path for both oldLocation and newLocation. - if ((oldPathTree != null) && (newPathTree != null)) { - UniquePathsUpdate update = new UniquePathsUpdate(event, false); - update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); - update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); - if (oldLocation.equals(newLocation)) { - //Only name has changed - // - Alter table rename for an external table - sentryStore.renameAuthzObj(oldAuthzObj, newAuthzObj, update); - } else { - // Both name and location has changed - // - Alter table rename for managed table - sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, oldPathTree, - newPathTree, update); - } - } else { - updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, event); - } - } else if (!oldLocation.equals(newLocation)) { - // Only Location has changed, e.g. Alter table set location - if ((oldPathTree != null) && (newPathTree != null)) { - UniquePathsUpdate update = new UniquePathsUpdate(event, false); - update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); - update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree)); - sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree, - newPathTree, update); - } else { - updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree,event); - } - } else { - // This code should not be hit as appropriate checks are performed by the callers of this method. - LOGGER.error("Update Notification for Auhorizable object {}, with no change, skipping", - oldAuthzObj); - throw new SentryInvalidHMSEventException("Update Notification for Authorizable object" - + "with no change"); - } - } - - private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree, - String newAuthzObj, String newPathTree, NotificationEvent event) throws Exception { - if (oldPathTree != null) { - UniquePathsUpdate update = new UniquePathsUpdate(event, false); - update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); - sentryStore.deleteAuthzPathsMapping(oldAuthzObj, - Collections.singleton(oldPathTree), - update); - } else if (newPathTree != null) { - UniquePathsUpdate update = new UniquePathsUpdate(event, false); - update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); - sentryStore.addAuthzPathsMapping(newAuthzObj, - Collections.singleton(newPathTree), - update); - } - - } - - /** - * Get path tree from a given path. It return null if encounters - * SentryMalformedPathException which indicates a malformed path. - * - * @param path a path - * @return the path tree given a path. - */ - private String getPath(String path) { - try { - return PathsUpdate.parsePath(path); - } catch (SentryMalformedPathException e) { - LOGGER.error("Unexpected path while parsing {}", path, e); - } - return null; - } - - private void dropSentryDbPrivileges(String dbName, NotificationEvent event) { - try { - TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName); - authorizable.setDb(dbName); - sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable)); - } catch (SentryNoSuchObjectException e) { - LOGGER.debug("Drop Sentry privilege ignored as there are no privileges on the database: {}", - dbName); - } catch (Exception e) { - LOGGER.error("Could not process Drop database event." + "Event: " + event.toString(), e); - } - } - - private void dropSentryTablePrivileges(String dbName, String tableName, - NotificationEvent event) { - try { - TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName); - authorizable.setDb(dbName); - authorizable.setTable(tableName); - sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable)); - } catch (SentryNoSuchObjectException e) { - LOGGER.debug("Drop Sentry privilege ignored as there are no privileges on the table: {}.{}", - dbName, tableName); - } catch (Exception e) { - LOGGER.error("Could not process Drop table event. Event: " + event.toString(), e); - } - } - - private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, - String newTableName) throws - Exception { - TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(authServerName); - oldAuthorizable.setDb(oldDbName); - oldAuthorizable.setTable(oldTableName); - TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(authServerName); - newAuthorizable.setDb(newDbName); - newAuthorizable.setTable(newTableName); - Update update = - getPermUpdatableOnRename(oldAuthorizable, newAuthorizable); - sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java deleted file mode 100644 index 9813a5a..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import java.util.Collection; -import java.util.Map; - -/** - * A container for complete hive paths snapshot. - * <p> - * It is composed by a hiveObj to Paths mapping, a paths image ID and the sequence number/change ID - * of latest delta change that the snapshot maps to. - */ -public class PathsImage { - - // A full image of hiveObj to Paths mapping. - private final Map<String, Collection<String>> pathImage; - private final long id; - private final long curImgNum; - - public PathsImage(Map<String, Collection<String>> pathImage, long id, long curImgNum) { - this.pathImage = pathImage; - this.id = id; - this.curImgNum = curImgNum; - } - - public long getId() { - return id; - } - - public long getCurImgNum() { - return curImgNum; - } - - public Map<String, Collection<String>> getPathImage() { - return pathImage; - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java deleted file mode 100644 index 4a02db2..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import org.apache.sentry.hdfs.service.thrift.TPrivilegeEntity; - -import java.util.List; -import java.util.Map; - -/** - * A container for complete sentry permission snapshot. - * <p> - * It is composed by a role to groups mapping, and hiveObj to < role/user, privileges > mapping. - * It also has the sequence number/change ID of latest delta change that the snapshot maps to. - */ -public class PermissionsImage { - - // A full snapshot of sentry role to groups mapping. - private final Map<String, List<String>> roleImage; - - // A full snapshot of hiveObj to <role/user, privileges> mapping. - private final Map<String, Map<TPrivilegeEntity, String>> privilegeImage; - private final long curSeqNum; - - public PermissionsImage(Map<String, List<String>> roleImage, - Map<String, Map<TPrivilegeEntity, String>> privilegeImage, long curSeqNum) { - this.roleImage = roleImage; - this.privilegeImage = privilegeImage; - this.curSeqNum = curSeqNum; - } - - public long getCurSeqNum() { - return curSeqNum; - } - - public Map<String, Map<TPrivilegeEntity, String>> getPrivilegeImage() { - return privilegeImage; - } - - public Map<String, List<String>> getRoleImage() { - return roleImage; - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/9351d19d/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/QueryParamBuilder.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/QueryParamBuilder.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/QueryParamBuilder.java deleted file mode 100644 index 6075e3f..0000000 --- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/QueryParamBuilder.java +++ /dev/null @@ -1,429 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.provider.db.service.persistent; - -import com.google.common.base.Joiner; -import org.apache.sentry.provider.db.service.model.MSentryRole; -import org.apache.sentry.provider.db.service.model.MSentryUser; - -import javax.annotation.concurrent.NotThreadSafe; -import javax.jdo.Query; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -/** - * The QueryParamBuilder provides mechanism for constructing complex JDOQL queries with parameters. - * <p> - * There are many places where we want to construct a non-trivial parametrized query, - * often with sub-queries. A simple query expression is just a list of conditions joined - * by operation (either && or ||). More complicated query may contain sub-expressions which may contain - * further sub-expressions, for example {@code (AA && (B | (C && D)))}. - * <p> - * Query may contain parameters, which usually come from external sources (e.g. Thrift requests). For example, - * to search for a record containing specific database name we can use something like - * {@code "dbName == " + request.getDbName()}. This opens a possibility JDOQL injection with carefully - * constructed requests. To avoid this we need to parameterize the query into something like - * {@code "dbName == :dbMame"} and pass {@code dbName} as a query parameter. (The colon in {@code :dbName} - * tells Datanucleus to automatically figure out the type of the {@code dbName} parameter). We collect all - * such parameters in a map and use {@code executeWithMap()} method of the query to pass all collected - * parameters to a query. - * <h2>Representing the query and sub-queries</h2> - * - * A query is represented as - * <ul> - * <li>Top-level query operation which is always && or ||</li> - * <li>List of strings that constitute simple subexpressions which are joined with the top level - * operator when the final query string is constructed</li> - * <li>List of sub-expressions. Each sub-expression is another QueryParamBuilder that shares - * the parameter map with the parent. Usually the top-level operation of the sub-expression - * is the inverse of the top-level operation if the parent. Since - * {@code (A && (B && C))} can be simplified as {@code (A && B && C)}, there is no - * need for parent and child to have the same top-level operation. - * Children are added using the {@link #newChild()} method.</li> - * </ul> - * - * <h2>Constructing the string representation of a query</h2> - * - * Once the query is fully constructed, we can get its string reopresentation suitable for the - * {@code setFilter()} method of {@link javax.jdo.Query} by using {@link #toString()} method - * which does the following: - * <ul> - * <li>Combines all accumulated simple subexpressions using the top-level operator</li> - * <li>Recursively combines children QueryParamBuilder objects using their {@link #toString()} - * methods</li> - * <li>Joins result with the top-level operation</li> - * </ul> - * <em>NOTE that we do not guarantee specific order of expressions within each level</em>. - * <p> - * The class also provides useful common methods for checking that some field is or - * isn't <em>NULL</em> and method <em>addSet()</em> to add dynamic set of key/values<p> - * - * Most class methods return <em>this</em> so it is possible to chain calls together. - * <p> - * - * The class is not thread-safe. - * <p> - * Examples: - * <ol> - * <li> - * <pre>{@code - * QueryParamBuilder p = newQueryParamBuilder(); - * p.add("key1", "val1").add("key2", "val2") - * - * // Returns "(this.key1 == :key1 && this.key2 == :key2)" - * String queryStr = p.toString(); - * - * // Returns map {"key1": "val1", "key2": "val2"} - * Map<String, Object> args = p.getArguments(); - * - * Query query = pm.newQuery(Foo.class); - * query.setFilter(queryStr); - * query.executeWIthMap(args); - * }</pre> - * </li> - * <li> - * <pre>{@code - * QueryParamBuilder p = newQueryParamBuilder(); - * p.add("key1", "val1").add("key2", "val2") - * .newChild() // Inverts logical op from && to || - * .add("key3", "val3") - * .add("key4", "val4"). - * - * // Returns "(this.key1 == :val1 && this.key2 == :val2 && \ - * // (this.key3 == :val4 || this.key4 == val4))" - * String queryStr1 = p.toString() - * }</pre> - * </li> - * </ol> - * - * @see <a href="http://www.datanucleus.org/products/datanucleus/jdo/jdoql.html">Datanucleus JDOQL</a> - */ -@NotThreadSafe -public class QueryParamBuilder { - - /** - * Representation of the top-level query operator. - * Query is built by joining all parts with the specified Op. - */ - enum Op { - AND(" && "), - OR(" || "); - - public String toString() { - return value; - } - - private final String value; - - /** Constructor from string */ - Op(String val) { - this.value = val; - } - } - - // Query parts that will be joined with Op - private final List<String> queryParts = new LinkedList<>(); - // List of children - allocated lazily when children are added - private List<QueryParamBuilder> children; - // Query Parameters - private final Map<String, Object> arguments; - // paramId is used for automatically generating variable names - private final AtomicLong paramId; - // Join operation - private final String operation; - - /** - * Create new {@link QueryParamBuilder} - * @return the default {@link QueryParamBuilder} with && top-level operation. - */ - public static QueryParamBuilder newQueryParamBuilder() { - return new QueryParamBuilder(); - } - - /** - * Create new {@link QueryParamBuilder} with specific top-level operator - * @param operation top-level operation for subexpressions - * @return {@link QueryParamBuilder} with specified top-level operator - */ - public static QueryParamBuilder newQueryParamBuilder(Op operation) { - return new QueryParamBuilder(operation); - } - - /** - * Create a new child builder and attach to this one. Child's join operation is the - * inverse of the parent - * @return new child of the QueryBuilder - */ - public QueryParamBuilder newChild() { - // Reverse operation of this builder - Op operation = this.operation.equals(Op.AND.toString()) ? Op.OR : Op.AND; - return this.newChild(operation); - } - - /** - * Create a new child builder attached to this one - * @param operation - join operation - * @return new child of the QueryBuilder - */ - private QueryParamBuilder newChild(Op operation) { - QueryParamBuilder child = new QueryParamBuilder(this, operation); - if (children == null) { - children = new LinkedList<>(); - } - children.add(child); - return child; - } - - /** - * Get query arguments - * @return query arguments as a map of arg name and arg value suitable for - * query.executeWithMap - */ - public Map<String, Object> getArguments() { - return arguments; - } - - /** - * Get query string - reconstructs the query string from all the parts and children. - * @return Query string which can be matched with arguments to execute a query. - */ - @Override - public String toString() { - if (children == null && queryParts.isEmpty()) { - return ""; - } - if (children == null) { - return "(" + Joiner.on(operation).join(queryParts) + ")"; - } - // Concatenate our query parts with all children - List<String> result = new LinkedList<>(queryParts); - for (Object child: children) { - result.add(child.toString()); - } - return "(" + Joiner.on(operation).join(result) + ")"; - } - - /** - * Add parameter for field fieldName with given value where value is any Object - * @param fieldName Field name to query for - * @param value Field value (can be any Object) - */ - public QueryParamBuilder addObject(String fieldName, Object value) { - return addCustomParam("this." + fieldName + " == :" + fieldName, - fieldName, value); - } - - /** - * Add string parameter for field fieldName - * @param fieldName name of the field - * @param value String value. Value is normalized - converted to lower case and trimmed - * @return this - */ - public QueryParamBuilder add(String fieldName, String value) { - return addCommon(fieldName, value, false); - } - - /** - * Add string parameter to field value with or without normalization - * @param fieldName field name of the field - * @param value String value, inserted as is if preserveCase is true, normalized otherwise - * @param preserveCase if true, trm and lowercase the value. - * @return this - */ - public QueryParamBuilder add(String fieldName, String value, boolean preserveCase) { - return addCommon(fieldName, value, preserveCase); - } - - /** - * Add condition that fieldName is not equal to NULL - * @param fieldName field name to compare to NULL - * @return this - */ - public QueryParamBuilder addNotNull(String fieldName) { - queryParts.add(String.format("this.%s != \"%s\"", fieldName, SentryStore.NULL_COL)); - return this; - } - - /** - * Add condition that fieldName is equal to NULL - * @param fieldName field name to compare to NULL - * @return this - */ - public QueryParamBuilder addNull(String fieldName) { - queryParts.add(String.format("this.%s == \"%s\"", fieldName, SentryStore.NULL_COL)); - return this; - } - - /** - * Add custom string for evaluation together with a single parameter. - * This is used in cases where we need expression different from this.name == value - * @param expr String expression containing ':< paramName>' somewhere - * @param paramName parameter name - * @param value parameter value - * @return this - */ - QueryParamBuilder addCustomParam(String expr, String paramName, Object value) { - arguments.put(paramName, value); - queryParts.add(expr); - return this; - } - - /** - * Add arbitrary query string without parameters - * @param expr String expression - * @return this - */ - QueryParamBuilder addString(String expr) { - queryParts.add(expr); - return this; - } - - /** - * Add common filter for set of Sentry roles. This is used to simplify creating filters for - * privileges belonging to the specified set of roles. - * @param query Query used for search - * @param paramBuilder paramBuilder for parameters - * @param roleNames set of role names - * @return paramBuilder supplied or a new one if the supplied one is null. - */ - public static QueryParamBuilder addRolesFilter(Query query, QueryParamBuilder paramBuilder, - Set<String> roleNames) { - query.declareVariables(MSentryRole.class.getName() + " role"); - if (paramBuilder == null) { - paramBuilder = new QueryParamBuilder(); - } - if (roleNames == null || roleNames.isEmpty()) { - return paramBuilder; - } - paramBuilder.newChild().addSet("role.roleName == ", roleNames); - paramBuilder.addString("roles.contains(role)"); - return paramBuilder; - } - - /** - * Add common filter for set of Sentry users. This is used to simplify creating filters for - * privileges belonging to the specified set of users. - * @param query Query used for search - * @param paramBuilder paramBuilder for parameters - * @param userNames set of user names - * @return paramBuilder supplied or a new one if the supplied one is null. - */ - public static QueryParamBuilder addUsersFilter(Query query, QueryParamBuilder paramBuilder, - Set<String> userNames) { - query.declareVariables(MSentryUser.class.getName() + " user"); - if (paramBuilder == null) { - paramBuilder = new QueryParamBuilder(); - } - if (userNames == null || userNames.isEmpty()) { - return paramBuilder; - } - paramBuilder.newChild().addSet("user.userName == ", userNames); - paramBuilder.addString("users.contains(user)"); - return paramBuilder; - } - - /** - * Add multiple conditions for set of values. - * <p> - * Example: - * <pre> - * Set<String>names = new HashSet<>(); - * names.add("foo"); - * names.add("bar"); - * names.add("bob"); - * paramBuilder.addSet("prefix == ", names); - * // Expect:"(prefix == :var0 && prefix == :var1 && prefix == :var2)" - * paramBuilder.toString()); - * // paramBuilder(getArguments()) contains mapping for var0, var1 and var2 - * </pre> - * @param prefix common prefix to use for expression - * @param values - * @return this - */ - QueryParamBuilder addSet(String prefix, Iterable<String> values) { - if (values == null) { - return this; - } - - // Add expressions of the form 'prefix :var$i' - for(String name: values) { - // Append index to the varName - String vName = "var" + paramId.toString(); - addCustomParam(prefix + ":" + vName, vName, name.trim().toLowerCase()); - paramId.incrementAndGet(); - } - return this; - } - - /** - * Construct a default QueryParamBuilder joining arguments with && - */ - private QueryParamBuilder() { - this(Op.AND); - } - - /** - * Construct generic QueryParamBuilder - * @param operation join operation (AND or OR) - */ - private QueryParamBuilder(Op operation) { - this.arguments = new HashMap<>(); - this.operation = operation.toString(); - this.paramId = new AtomicLong(0); - } - - /** - * Internal constructor used for children - reuses arguments from parent - * @param parent parent element - * @param operation join operation - */ - private QueryParamBuilder(QueryParamBuilder parent, Op operation) { - this.arguments = parent.getArguments(); - this.operation = operation.toString(); - this.paramId = parent.paramId; - } - - /** - * common code for adding string values - * @param fieldName field name to add - * @param value field value - * @param preserveCase if true, do not trim and lower - * @return this - * @throws IllegalArgumentException if fieldName was already added before - */ - private QueryParamBuilder addCommon(String fieldName, String value, - boolean preserveCase) { - Object oldValue; - if (preserveCase) { - oldValue = arguments.put(fieldName, SentryStore.toNULLCol(SentryStore.safeTrim(value))); - } else { - oldValue = arguments.put(fieldName, SentryStore.toNULLCol(SentryStore.safeTrimLower(value))); - } - if (oldValue != null) { - // Attempt to insert the same field twice - throw new IllegalArgumentException("field " + fieldName + "already exists"); - } - queryParts.add("this." + fieldName + " == :" + fieldName); - return this; - } -}
