http://git-wip-us.apache.org/repos/asf/sentry/blob/7db84b2f/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java new file mode 100644 index 0000000..42770df --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java @@ -0,0 +1,530 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import org.apache.sentry.core.common.utils.PubSub; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; + +import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME; +import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jdo.JDODataStoreException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.thrift.TException; +import org.apache.sentry.service.thrift.SentryHMSClient; +import org.apache.sentry.service.thrift.HiveConnectionFactory; +import org.apache.sentry.service.thrift.HiveNotificationFetcher; +import org.apache.sentry.api.common.SentryServiceUtil; +import org.apache.sentry.service.thrift.SentryStateBank; +import org.apache.sentry.service.thrift.SentryServiceState; +import org.apache.sentry.service.thrift.HMSFollowerState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HMSFollower is the thread which follows the Hive MetaStore state changes from Sentry. + * It gets the full update and notification logs from HMS and applies it to + * update permissions stored in Sentry using SentryStore and also update the < obj,path > state + * stored for HDFS-Sentry sync. + */ +public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { + + private static final Logger LOGGER = LoggerFactory.getLogger(HMSFollower.class); + private static final String FULL_UPDATE_TRIGGER = "FULL UPDATE TRIGGER: "; + private static boolean connectedToHms = false; + + private SentryHMSClient client; + private final Configuration authzConf; + private final SentryStore sentryStore; + private final NotificationProcessor notificationProcessor; + private boolean readyToServe; + private final HiveNotificationFetcher notificationFetcher; + private final boolean hdfsSyncEnabled; + private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false); + + private final LeaderStatusMonitor leaderMonitor; + + /** + * Current generation of HMS snapshots. HMSFollower is single-threaded, so no need + * to protect against concurrent modification. + */ + private long hmsImageId = SentryStore.EMPTY_PATHS_SNAPSHOT_ID; + + /** + * Configuring Hms Follower thread. + * + * @param conf sentry configuration + * @param store sentry store + * @param leaderMonitor singleton instance of LeaderStatusMonitor + */ + public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, + HiveConnectionFactory hiveConnectionFactory) { + this(conf, store, leaderMonitor, hiveConnectionFactory, null); + } + + /** + * Constructor should be used only for testing purposes. + * + * @param conf sentry configuration + * @param store sentry store + * @param leaderMonitor + * @param authServerName Server that sentry is Authorizing + */ + @VisibleForTesting + public HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor leaderMonitor, + HiveConnectionFactory hiveConnectionFactory, String authServerName) { + LOGGER.info("HMSFollower is being initialized"); + readyToServe = false; + authzConf = conf; + this.leaderMonitor = leaderMonitor; + sentryStore = store; + + if (authServerName == null) { + authServerName = conf.get(AUTHZ_SERVER_NAME.getVar(), + conf.get(AUTHZ_SERVER_NAME_DEPRECATED.getVar(), AUTHZ_SERVER_NAME_DEPRECATED.getDefault())); + } + + notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf); + client = new SentryHMSClient(authzConf, hiveConnectionFactory); + hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync + notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory); + + // subscribe to full update notification + if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) { + LOGGER.info(FULL_UPDATE_TRIGGER + "subscribing to topic " + PubSub.Topic.HDFS_SYNC_HMS.getName()); + PubSub.getInstance().subscribe(PubSub.Topic.HDFS_SYNC_HMS, this); + } + if(!hdfsSyncEnabled) { + try { + // Clear all the HMS metadata learned so far and learn it fresh when the feature + // is enabled back. + store.clearHmsPathInformation(); + } catch (Exception ex) { + LOGGER.error("Failed to clear HMS path info", ex); + LOGGER.error("Please manually clear data from SENTRY_PATH_CHANGE/AUTHZ_PATH/AUTHZ_PATHS_MAPPING tables." + + "If not, HDFS ACL's will be inconsistent when HDFS sync feature is enabled back."); + } + } + } + + @VisibleForTesting + public static boolean isConnectedToHms() { + return connectedToHms; + } + + @VisibleForTesting + void setSentryHmsClient(SentryHMSClient client) { + this.client = client; + } + + @Override + public void close() { + if (client != null) { + // Close any outstanding connections to HMS + try { + client.disconnect(); + SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED); + } catch (Exception failure) { + LOGGER.error("Failed to close the Sentry Hms Client", failure); + } + } + + notificationFetcher.close(); + } + + @Override + public void run() { + SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED); + long lastProcessedNotificationId; + try { + try { + // Initializing lastProcessedNotificationId based on the latest persisted notification ID. + lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID(); + } catch (Exception e) { + LOGGER.error("Failed to get the last processed notification id from sentry store, " + + "Skipping the processing", e); + return; + } + // Wake any clients connected to this service waiting for HMS already processed notifications. + wakeUpWaitingClientsForSync(lastProcessedNotificationId); + // Only the leader should listen to HMS updates + if (!isLeader()) { + // Close any outstanding connections to HMS + close(); + return; + } + syncupWithHms(lastProcessedNotificationId); + } finally { + SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED); + } + } + + private boolean isLeader() { + return (leaderMonitor == null) || leaderMonitor.isLeader(); + } + + @VisibleForTesting + String getAuthServerName() { + return notificationProcessor.getAuthServerName(); + } + + /** + * Processes new Hive Metastore notifications. + * + * <p>If no notifications are processed yet, then it + * does a full initial snapshot of the Hive Metastore followed by new notifications updates that + * could have happened after it. + * + * <p>Clients connections waiting for an event notification will be + * woken up afterwards. + */ + void syncupWithHms(long notificationId) { + try { + client.connect(); + connectedToHms = true; + SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.CONNECTED); + } catch (Throwable e) { + LOGGER.error("HMSFollower cannot connect to HMS!!", e); + return; + } + + try { + if (hdfsSyncEnabled) { + // Before getting notifications, checking if a full HMS snapshot is required. + if (isFullSnapshotRequired(notificationId)) { + createFullSnapshot(); + return; + } + } else if (isSentryOutOfSync(notificationId)) { + // Out-of-sync, fetching all the notifications + // in HMS NOTIFICATION_LOG. + sentryStore.setLastProcessedNotificationID(0L); + notificationId = 0L; + } + + Collection<NotificationEvent> notifications = + notificationFetcher.fetchNotifications(notificationId); + + // After getting notifications, check if HMS did some clean-up and notifications + // are out-of-sync with Sentry. + if (hdfsSyncEnabled && + areNotificationsOutOfSync(notifications, notificationId)) { + // Out-of-sync, taking a HMS full snapshot. + createFullSnapshot(); + return; + } + + if (!readyToServe) { + // Allow users and/or applications who look into the Sentry console output to see + // when Sentry is ready to serve. + System.out.println("Sentry HMS support is ready"); + readyToServe = true; + } + + // Continue with processing new notifications if no snapshots are done. + processNotifications(notifications); + } catch (TException e) { + LOGGER.error("An error occurred while fetching HMS notifications: ", e); + close(); + } catch (Throwable t) { + // catching errors to prevent the executor to halt. + LOGGER.error("Exception in HMSFollower! Caused by: " + t.getMessage(), t); + + close(); + } + } + + /** + * Checks if a new full HMS snapshot request is needed by checking if: + * <ul> + * <li>Sentry HMS Notification table is EMPTY</li> + * <li>HDFSSync is enabled and Sentry Authz Snapshot table is EMPTY</li> + * <li>The current notification Id on the HMS is less than the + * latest processed by Sentry.</li> + * <li>Full Snapshot Signal is detected</li> + * </ul> + * + * @param latestSentryNotificationId The notification Id to check against the HMS + * @return True if a full snapshot is required; False otherwise. + * @throws Exception If an error occurs while checking the SentryStore or the HMS client. + */ + private boolean isFullSnapshotRequired(long latestSentryNotificationId) throws Exception { + if (sentryStore.isHmsNotificationEmpty()) { + LOGGER.debug("Sentry Store has no HMS Notifications. Create Full HMS Snapshot. " + + "latest sentry notification Id = {}", latestSentryNotificationId); + return true; + } + + // Once HDFS sync is enabled, and if MAuthzPathsMapping + // table is still empty, we need to request a full snapshot + if (sentryStore.isAuthzPathsSnapshotEmpty()) { + LOGGER.debug("HDFSSync is enabled and MAuthzPathsMapping table is empty." + + " Need to request a full snapshot"); + return true; + } + + if(isSentryOutOfSync(latestSentryNotificationId)) { + return true; + } + + // check if forced full update is required, reset update flag to false + // to only do it once per forced full update request. + if (fullUpdateHMS.compareAndSet(true, false)) { + LOGGER.info(FULL_UPDATE_TRIGGER + "initiating full HMS snapshot request"); + return true; + } + + return false; + } + + /** + * Checks the last notification processed by sentry and the current event-id of + * HMS to see if sentry is out of sync. + * + * @param latestSentryNotificationId The notification Id to check against the HMS + * @return True, sentry is out-of-sync, False otherwise + * @throws Exception If an error occurs while fetching the current notification from HMS + */ + private boolean isSentryOutOfSync(long latestSentryNotificationId) throws Exception { + long currentHmsNotificationId = notificationFetcher.getCurrentNotificationId(); + if (currentHmsNotificationId < latestSentryNotificationId) { + LOGGER.info("The current notification ID on HMS = {} is less than the latest processed Sentry " + + "notification ID = {}. Sentry, Out-of-sync", + currentHmsNotificationId, latestSentryNotificationId); + return true; + } + return false; + } + + /** + * Checks if the HMS and Sentry processed notifications are out-of-sync. + * This could happen because the HMS did some clean-up of old notifications + * and Sentry was not requesting notifications during that time. + * + * @param events All new notifications to check for an out-of-sync. + * @param latestProcessedId The latest notification processed by Sentry to check against the + * list of notifications events. + * @return True if an out-of-sync is found; False otherwise. + */ + private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events, + long latestProcessedId) { + if (events.isEmpty()) { + return false; + } + + /* + * If the sequence of notifications has a gap, then an out-of-sync might + * have happened due to the following issue: + * + * - HDFS sync was disabled or Sentry was shutdown for a time period longer than + * the HMS notification clean-up thread causing old notifications to be deleted. + * + * HMS notifications may contain both gaps in the sequence and duplicates + * (the same ID repeated more then once for different events). + * + * To accept duplicates (see NotificationFetcher for more info), then a gap is found + * if the 1st notification received is higher than the current ID processed + 1. + * i.e. + * 1st ID = 3, latest ID = 3 (duplicate found but no gap detected) + * 1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected) + * 1st ID = 5, latest ID = 3 (a gap is detected) + */ + + List<NotificationEvent> eventList = (List<NotificationEvent>) events; + long firstNotificationId = eventList.get(0).getEventId(); + + if (firstNotificationId > (latestProcessedId + 1)) { + LOGGER.info("First HMS event notification Id = {} is greater than latest Sentry processed" + + "notification Id = {} + 1. Need to request a full HMS snapshot.", firstNotificationId, latestProcessedId); + return true; + } + + return false; + } + + /** + * Request for full snapshot and persists it if there is no snapshot available in the sentry + * store. Also, wakes-up any waiting clients. + * + * @return ID of last notification processed. + * @throws Exception if there are failures + */ + private long createFullSnapshot() throws Exception { + LOGGER.debug("Attempting to take full HMS snapshot"); + Preconditions.checkState(!SentryStateBank.isEnabled(SentryServiceState.COMPONENT, + SentryServiceState.FULL_UPDATE_RUNNING), + "HMSFollower shown loading full snapshot when it should not be."); + try { + // Set that the full update is running + SentryStateBank + .enableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING); + + PathsImage snapshotInfo = client.getFullSnapshot(); + if (snapshotInfo.getPathImage().isEmpty()) { + LOGGER.debug("Received empty path image from HMS while taking a full snapshot"); + return snapshotInfo.getId(); + } + + // Check we're still the leader before persisting the new snapshot + if (!isLeader()) { + LOGGER.info("Not persisting full snapshot since not a leader"); + return SentryStore.EMPTY_NOTIFICATION_ID; + } + try { + if (hdfsSyncEnabled) { + LOGGER.info("Persisting full snapshot for notification Id = {}", snapshotInfo.getId()); + sentryStore.persistFullPathsImage(snapshotInfo.getPathImage(), snapshotInfo.getId()); + } else { + // We need to persist latest notificationID for next poll + LOGGER.info("HDFSSync is disabled. Not Persisting full snapshot, " + + "but only setting last processed notification Id = {}", snapshotInfo.getId()); + sentryStore.setLastProcessedNotificationID(snapshotInfo.getId()); + } + } catch (Exception failure) { + LOGGER.error("Received exception while persisting HMS path full snapshot "); + throw failure; + } + // Wake up any HMS waiters that could have been put on hold before getting the + // eventIDBefore value. + wakeUpWaitingClientsForSync(snapshotInfo.getId()); + // HMSFollower connected to HMS and it finished full snapshot if that was required + // Log this message only once + LOGGER.info("Sentry HMS support is ready"); + return snapshotInfo.getId(); + } catch(Exception failure) { + LOGGER.error("Received exception while creating HMS path full snapshot "); + throw failure; + } finally { + SentryStateBank + .disableState(SentryServiceState.COMPONENT, SentryServiceState.FULL_UPDATE_RUNNING); + } + } + + /** + * Process the collection of notifications and wake up any waiting clients. + * Also, persists the notification ID regardless of processing result. + * + * @param events list of event to be processed + * @throws Exception if the complete notification list is not processed because of JDO Exception + */ + public void processNotifications(Collection<NotificationEvent> events) throws Exception { + boolean isNotificationProcessed; + if (events.isEmpty()) { + return; + } + + for (NotificationEvent event : events) { + isNotificationProcessed = false; + try { + // Only the leader should process the notifications + if (!isLeader()) { + LOGGER.debug("Not processing notifications since not a leader"); + return; + } + isNotificationProcessed = notificationProcessor.processNotificationEvent(event); + } catch (Exception e) { + if (e.getCause() instanceof JDODataStoreException) { + LOGGER.info("Received JDO Storage Exception, Could be because of processing " + + "duplicate notification"); + if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) { + // Rest of the notifications need not be processed. + LOGGER.error("Received event with Id: {} which is smaller then the ID " + + "persisted in store", event.getEventId()); + break; + } + } else { + LOGGER.error("Processing the notification with ID:{} failed with exception {}", + event.getEventId(), e); + } + } + if (!isNotificationProcessed) { + try { + // Update the notification ID in the persistent store even when the notification is + // not processed as the content in in the notification is not valid. + // Continue processing the next notification. + LOGGER.debug("Explicitly Persisting Notification ID = {} ", event.getEventId()); + sentryStore.persistLastProcessedNotificationID(event.getEventId()); + } catch (Exception failure) { + LOGGER.error("Received exception while persisting the notification ID = {}", event.getEventId()); + throw failure; + } + } + // Wake up any HMS waiters that are waiting for this ID. + wakeUpWaitingClientsForSync(event.getEventId()); + } + } + + /** + * Wakes up HMS waiters waiting for a specific event notification.<p> + * + * Verify that HMS image id didn't change since the last time we looked. + * If id did, it is possible that notifications jumped backward, so reset + * the counter to the current value. + * + * @param eventId Id of a notification + */ + private void wakeUpWaitingClientsForSync(long eventId) { + CounterWait counterWait = sentryStore.getCounterWait(); + + LOGGER.debug("wakeUpWaitingClientsForSync: eventId = {}, hmsImageId = {}", eventId, hmsImageId); + // Wake up any HMS waiters that are waiting for this ID. + // counterWait should never be null, but tests mock SentryStore and a mocked one + // doesn't have it. + if (counterWait == null) { + return; + } + + long lastHMSSnapshotId = hmsImageId; + try { + // Read actual HMS image ID + lastHMSSnapshotId = sentryStore.getLastProcessedImageID(); + LOGGER.debug("wakeUpWaitingClientsForSync: lastHMSSnapshotId = {}", lastHMSSnapshotId); + } catch (Exception e) { + counterWait.update(eventId); + LOGGER.error("Failed to get the last processed HMS image id from sentry store"); + return; + } + + // Reset the counter if the persisted image ID is greater than current image ID + if (lastHMSSnapshotId > hmsImageId) { + counterWait.reset(eventId); + hmsImageId = lastHMSSnapshotId; + LOGGER.debug("wakeUpWaitingClientsForSync: reset counterWait with eventId = {}, new hmsImageId = {}", eventId, hmsImageId); + } + + LOGGER.debug("wakeUpWaitingClientsForSync: update counterWait with eventId = {}, hmsImageId = {}", eventId, hmsImageId); + counterWait.update(eventId); + } + + /** + * PubSub.Subscriber callback API + */ + @Override + public void onMessage(PubSub.Topic topic, String message) { + Preconditions.checkArgument(topic == PubSub.Topic.HDFS_SYNC_HMS, "Unexpected topic %s instead of %s", topic, PubSub.Topic.HDFS_SYNC_HMS); + LOGGER.info(FULL_UPDATE_TRIGGER + "Received [{}, {}] notification", topic, message); + fullUpdateHMS.set(true); + } +}
http://git-wip-us.apache.org/repos/asf/sentry/blob/7db84b2f/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java new file mode 100644 index 0000000..c2f1ad0 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/LeaderStatusMonitor.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.provider.db.service.persistent; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderSelector; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; +import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; +import org.apache.hadoop.conf.Configuration; + +import javax.annotation.concurrent.ThreadSafe; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.sentry.service.common.ServiceConstants.ServerConfig.*; + +/** + * LeaderStatusMonitor participates in the distributed leader election protocol + * and allows clients to access the global leaadership status. + * <p> + * LeaderStatusMonitor is a singleton that uses Curator framework via + * {@link HAContext}.The leadership status can be accessed via the + * {@link #isLeader()} method.<p> + * + * Usually leadership re-election is initiated by the Curator framework when one + * of the nodes disconnects from ZooKeeper, but LeaderStatusMonitor also supports + * voluntary release of the leadership via the {@link #deactivate()} method. This is + * intended to be used for debugging purposes. + * <p> + * The class also simulates leader election in non-HA environments. In such cases its + * {@link #isLeader()} method always returns True. The non-HA environment is determined + * by the absence of the SENTRY_HA_ZOOKEEPER_QUORUM in the configuration. + * + * <h2>Implementation notes</h2> + * + * <h3>Initialization</h3> + * + * Class initialization is split between the constructor and the {@link #init()} method. + * There are two reasons for it: + * <ul> + * <li>We do not want to pass <strong>this</strong> reference to + * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)} + * until it is fully initialized</li> + * <li>We do not want to call {@link LeaderSelector#start()} method in constructor</li> + * </ul> + * + * Since LeaderStatusMonitor is a singleton and an instance can only be obtained via the + * {@link #getLeaderStatusMonitor(Configuration)} method, we hide this construction split + * from the callers. + * + * <h3>Synchronization</h3> + * Singleton synchronization is achieved using the synchronized class builder + * {@link #getLeaderStatusMonitor(Configuration)} + * <p> + * Upon becoming a leader, the code loops in {@link #takeLeadership(CuratorFramework)} + * until it receives a deactivation signal from {@link #deactivate()}. This is synchronized + * using a {@link #lock} and condition variable {@link #cond}. + * <p> + * Access to the leadership status {@link #isLeader} is also protected by the {@link #lock}. + * This isn't strictly necessary and a volatile field would be sufficient, but since we + * already use the {@link #lock} this is more straightforward. + */ +@ThreadSafe +public final class LeaderStatusMonitor + extends LeaderSelectorListenerAdapter implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(LeaderStatusMonitor.class); + + private static final String LEADER_SELECTOR_SUFFIX = "leader"; + + /** Unique instance of the singleton object */ + private static LeaderStatusMonitor leaderStatusMonitor = null; + + private final HAContext haContext; + + /** Unique string describing this instance */ + private final String defaultIncarnationId = generateIncarnationId(); + private String incarnationId; + + /** True when not using ZooKeeeper */ + private final boolean isSingleNodeMode; + + /** Lock and condition used to signal the leader to voluntary release leadership */ + private final Lock lock = new ReentrantLock(); + /** Condition variable used to synchronize voluntary leadership release */ + private final Condition cond = lock.newCondition(); + /** Leadership status - true if leader. */ + private boolean isLeader = false; + + /** Curator framework leader monitor */ + private LeaderSelector leaderSelector = null; + + /** The number of times this incarnation has become the leader. */ + private final AtomicLong leaderCount = new AtomicLong(0); + + /** + * Constructor. Initialize state and create HA context if configuration + * specifies ZooKeeper servers. + * @param conf Configuration. The fields we are interested in are: + * <ul> + * <li>SENTRY_HA_ZOOKEEPER_QUORUM</li> + * </ul> + * Configuration is also passed to the + * {@link HAContext#newLeaderSelector(String, LeaderSelectorListener)} + * which uses more properties. + * @throws Exception + */ + + @VisibleForTesting + protected LeaderStatusMonitor(Configuration conf) throws Exception { + // Only enable HA configuration if zookeeper is configured + String zkServers = conf.get(SENTRY_HA_ZOOKEEPER_QUORUM, ""); + if (zkServers.isEmpty()) { + isSingleNodeMode = true; + haContext = null; + isLeader = true; + incarnationId = ""; + LOGGER.info("Leader election protocol disabled, assuming single active server"); + return; + } + isSingleNodeMode = false; + incarnationId = defaultIncarnationId; + haContext = HAContext.getHAServerContext(conf); + + LOGGER.info("Created LeaderStatusMonitor(incarnationId={}, " + + "zkServers='{}')", incarnationId, zkServers); + } + + /** + * Tests may need to provide custm incarnation ID + * @param conf confguration + * @param incarnationId custom incarnation ID + * @throws Exception + */ + @VisibleForTesting + protected LeaderStatusMonitor(Configuration conf, String incarnationId) throws Exception { + this(conf); + this.incarnationId = incarnationId; + } + + /** + * Second half of the constructor links this object with {@link HAContext} and + * starts leader election protocol. + */ + @VisibleForTesting + protected void init() { + if (isSingleNodeMode) { + return; + } + + leaderSelector = haContext.newLeaderSelector("/" + LEADER_SELECTOR_SUFFIX, this); + leaderSelector.setId(incarnationId); + leaderSelector.autoRequeue(); + leaderSelector.start(); + } + + /** + * + * @param conf Configuration. See {@link #LeaderStatusMonitor(Configuration)} for details. + * @return A global LeaderStatusMonitor instance. + * @throws Exception + */ + @SuppressWarnings("LawOfDemeter") + public static synchronized LeaderStatusMonitor getLeaderStatusMonitor(Configuration conf) + throws Exception { + if (leaderStatusMonitor == null) { + leaderStatusMonitor = new LeaderStatusMonitor(conf); + leaderStatusMonitor.init(); + } + return leaderStatusMonitor; + } + + /** + * @return number of times this leader was elected. Used for metrics. + */ + public long getLeaderCount() { + return leaderCount.get(); + } + + /** + * Shut down the LeaderStatusMonitor and wait for it to transition to + * standby. + */ + @Override + public void close() { + if (leaderSelector != null) { + // Shut down our Curator hooks. + leaderSelector.close(); + } + } + + /** + * Deactivate the current client, if it is active. + * In non-HA case this is a no-op. + */ + public void deactivate() { + if (isSingleNodeMode) { + return; + } + lock.lock(); + try { + cond.signal(); + } finally { + lock.unlock(); + } + } + + /** + * @return true iff we are the leader. + * In non-HA case always returns true + */ + public boolean isLeader() { + if (isSingleNodeMode) { + return true; + } + lock.lock(); + @SuppressWarnings("FieldAccessNotGuarded") + boolean leader = isLeader; + lock.unlock(); + return leader; + } + + /** + * Curator framework callback which is called when we become a leader. + * Should return only when we decide to resign. + */ + @Override + public void takeLeadership(CuratorFramework client) throws Exception { + leaderCount.incrementAndGet(); + LOGGER.info("Becoming leader in Sentry HA cluster:{}", this); + lock.lock(); + try { + isLeader = true; + // Wait until we are interrupted or receive a signal + cond.await(); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + LOGGER.error("takeLeadership call interrupted:" + this, ignored); + } finally { + isLeader = false; + lock.unlock(); + LOGGER.info("Resigning from leader status in a Sentry HA cluster:{}", this); + } + } + + /** + * Generate ID for the activator. <p> + * + * Ideally we would like something like host@pid, but Java doesn't provide a good + * way to determine pid value, so we use + * {@link RuntimeMXBean#getName()} which usually contains host + * name and pid. + */ + private static String generateIncarnationId() { + return ManagementFactory.getRuntimeMXBean().getName(); + } + + @Override + public String toString() { + return isSingleNodeMode?"Leader election disabled": + String.format("{isSingleNodeMode=%b, incarnationId=%s, isLeader=%b, leaderCount=%d}", + isSingleNodeMode, incarnationId, isLeader, leaderCount.longValue()); + } + +} http://git-wip-us.apache.org/repos/asf/sentry/blob/7db84b2f/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java new file mode 100644 index 0000000..a771f4b --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/NotificationProcessor.java @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.codahale.metrics.Timer.Context; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAddPartitionMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterPartitionMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONAlterTableMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateDatabaseMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONCreateTableMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropDatabaseMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropPartitionMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONDropTableMessage; +import org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageDeserializer; +import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException; +import org.apache.sentry.core.common.exception.SentryInvalidInputException; +import org.apache.sentry.core.common.exception.SentryNoSuchObjectException; +import org.apache.sentry.core.common.utils.PathUtils; +import org.apache.sentry.hdfs.PathsUpdate; +import org.apache.sentry.hdfs.PermissionsUpdate; +import org.apache.sentry.hdfs.SentryMalformedPathException; +import org.apache.sentry.hdfs.UniquePathsUpdate; +import org.apache.sentry.hdfs.Updateable.Update; +import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges; +import org.apache.sentry.api.service.thrift.SentryMetrics; +import org.apache.sentry.api.service.thrift.TSentryAuthorizable; +import org.apache.sentry.api.common.SentryServiceUtil; +import org.apache.sentry.hdfs.service.thrift.TPrivilegeEntityType; +import org.apache.sentry.hdfs.service.thrift.TPrivilegeEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; + +import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE; +import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE; + + + +/** + * NotificationProcessor processes various notification events generated from + * the Hive MetaStore state change, and applies these changes to the complete + * HMS Paths snapshot or delta update stored in Sentry using SentryStore. + * + * <p>NotificationProcessor should not skip processing notification events for any reason. + * If some notification events are to be skipped, appropriate logic should be added in + * HMSFollower before invoking NotificationProcessor. + */ +final class NotificationProcessor { + + private static final Logger LOGGER = LoggerFactory.getLogger(NotificationProcessor.class); + private final SentryStore sentryStore; + private final SentryJSONMessageDeserializer deserializer; + private final String authServerName; + // These variables can be updated even after object is instantiated, for testing purposes. + private boolean syncStoreOnCreate = false; + private boolean syncStoreOnDrop = false; + private final boolean hdfsSyncEnabled; + + /** + * Configuring notification processor. + * + * @param sentryStore sentry backend store + * @param authServerName Server that sentry is authorizing + * @param conf sentry configuration + */ + NotificationProcessor(SentryStore sentryStore, String authServerName, + Configuration conf) { + this.sentryStore = sentryStore; + deserializer = new SentryJSONMessageDeserializer(); + this.authServerName = authServerName; + syncStoreOnCreate = Boolean + .parseBoolean(conf.get(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getVar(), + AUTHZ_SYNC_CREATE_WITH_POLICY_STORE.getDefault())); + syncStoreOnDrop = Boolean.parseBoolean(conf.get(AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getVar(), + AUTHZ_SYNC_DROP_WITH_POLICY_STORE.getDefault())); + hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(conf); + } + + /** + * Split path into components on the "/" character. + * The path should not start with "/". + * This is consumed by Thrift interface, so the return result should be + * {@code List<String>} + * + * @param path input oath e.g. {@code foo/bar} + * @return list of components, e.g. [foo, bar] + */ + private static List<String> splitPath(String path) { + return Lists.newArrayList(PathUtils.splitPath(path)); + } + + /** + * Constructs permission update to be persisted for drop event that can be persisted + * from thrift object. + * + * @param authorizable thrift object that is dropped. + * @return update to be persisted + * @throws SentryInvalidInputException if the required fields are set in argument provided + */ + @VisibleForTesting + static Update getPermUpdatableOnDrop(TSentryAuthorizable authorizable) + throws SentryInvalidInputException { + PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); + String authzObj = SentryServiceUtil.getAuthzObj(authorizable); + update.addPrivilegeUpdate(authzObj) + .putToDelPrivileges(new TPrivilegeEntity(TPrivilegeEntityType.ROLE, PermissionsUpdate.ALL_ROLES), + PermissionsUpdate.ALL_ROLES); + return update; + } + + @VisibleForTesting + String getAuthServerName() { + return authServerName; + } + + /** + * Constructs permission update to be persisted for rename event that can be persisted from thrift + * object. + * + * @param oldAuthorizable old thrift object + * @param newAuthorizable new thrift object + * @return update to be persisted + * @throws SentryInvalidInputException if the required fields are set in arguments provided + */ + @VisibleForTesting + static Update getPermUpdatableOnRename(TSentryAuthorizable oldAuthorizable, + TSentryAuthorizable newAuthorizable) + throws SentryInvalidInputException { + String oldAuthz = SentryServiceUtil.getAuthzObj(oldAuthorizable); + String newAuthz = SentryServiceUtil.getAuthzObj(newAuthorizable); + PermissionsUpdate update = new PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false); + TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); + privUpdate.putToAddPrivileges(new TPrivilegeEntity(TPrivilegeEntityType.AUTHZ_OBJ, newAuthz), newAuthz); + privUpdate.putToDelPrivileges(new TPrivilegeEntity(TPrivilegeEntityType.AUTHZ_OBJ, oldAuthz), oldAuthz); + return update; + } + + /** + * This function is only used for testing purposes. + * + * @param value to be set + */ + @SuppressWarnings("SameParameterValue") + @VisibleForTesting + void setSyncStoreOnCreate(boolean value) { + syncStoreOnCreate = value; + } + + /** + * This function is only used for testing purposes. + * + * @param value to be set + */ + @SuppressWarnings("SameParameterValue") + @VisibleForTesting + void setSyncStoreOnDrop(boolean value) { + syncStoreOnDrop = value; + } + + /** + * Processes the event and persist to sentry store. + * + * @param event to be processed + * @return true, if the event is persisted to sentry store. false, if the event is not persisted. + * @throws Exception if there is an error processing the event. + */ + boolean processNotificationEvent(NotificationEvent event) throws Exception { + LOGGER + .debug("Processing event with id:{} and Type:{}", event.getEventId(), event.getEventType()); + + // Expose time used for each request time as a metric. + // We use lower-case version of the event name. + EventType eventType = EventType.valueOf(event.getEventType()); + Timer timer = SentryMetrics + .getInstance() + .getTimer(MetricRegistry.name(HMSFollower.class, eventType.toString().toLowerCase())); + + try (Context ignored = timer.time()) { + switch (eventType) { + case CREATE_DATABASE: + return processCreateDatabase(event); + case DROP_DATABASE: + return processDropDatabase(event); + case CREATE_TABLE: + return processCreateTable(event); + case DROP_TABLE: + return processDropTable(event); + case ALTER_TABLE: + return processAlterTable(event); + case ADD_PARTITION: + return processAddPartition(event); + case DROP_PARTITION: + return processDropPartition(event); + case ALTER_PARTITION: + return processAlterPartition(event); + default: + LOGGER.error("Notification with ID:{} has invalid event type: {}", event.getEventId(), + event.getEventType()); + return false; + } + } + } + + /** + * Processes "create database" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param event notification event to be processed. + * @throws Exception if encounters errors while persisting the path change + */ + private boolean processCreateDatabase(NotificationEvent event) throws Exception { + SentryJSONCreateDatabaseMessage message = + deserializer.getCreateDatabaseMessage(event.getMessage()); + String dbName = message.getDB(); + String location = message.getLocation(); + if ((dbName == null) || (location == null)) { + LOGGER.warn("Create database event " + + "has incomplete information. dbName: {} location: {}", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(location, "null")); + return false; + } + + if (syncStoreOnCreate) { + dropSentryDbPrivileges(dbName, event); + } + + if (hdfsSyncEnabled) { + List<String> locations = Collections.singletonList(location); + addPaths(dbName, locations, event); + + return true; + } + + return false; + } + + /** + * Processes "drop database" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param event notification event to be processed. + * @throws Exception if encounters errors while persisting the path change + */ + private boolean processDropDatabase(NotificationEvent event) throws Exception { + SentryJSONDropDatabaseMessage dropDatabaseMessage = + deserializer.getDropDatabaseMessage(event.getMessage()); + String dbName = dropDatabaseMessage.getDB(); + String location = dropDatabaseMessage.getLocation(); + if (dbName == null) { + LOGGER.warn("Drop database event has incomplete information: dbName = null"); + return false; + } + if (syncStoreOnDrop) { + dropSentryDbPrivileges(dbName, event); + } + + if (hdfsSyncEnabled) { + List<String> locations = Collections.singletonList(location); + removePaths(dbName, locations, event); + return true; + } + return false; + } + + /** + * Processes "create table" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param event notification event to be processed. + * @throws Exception if encounters errors while persisting the path change + */ + private boolean processCreateTable(NotificationEvent event) + throws Exception { + SentryJSONCreateTableMessage createTableMessage = deserializer + .getCreateTableMessage(event.getMessage()); + String dbName = createTableMessage.getDB(); + String tableName = createTableMessage.getTable(); + String location = createTableMessage.getLocation(); + if ((dbName == null) || (tableName == null) || (location == null)) { + LOGGER.warn(String.format("Create table event " + "has incomplete information." + + " dbName = %s, tableName = %s, location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + StringUtils.defaultIfBlank(location, "null"))); + return false; + } + if (syncStoreOnCreate) { + dropSentryTablePrivileges(dbName, tableName, event); + } + + if (hdfsSyncEnabled) { + String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); + List<String> locations = Collections.singletonList(location); + addPaths(authzObj, locations, event); + return true; + } + + return false; + } + + /** + * Processes "drop table" notification event. It drops all partitions belongs to + * the table as well. And applies its corresponding snapshot change as well + * as delta path update into Sentry DB. + * + * @param event notification event to be processed. + * @throws Exception if encounters errors while persisting the path change + */ + private boolean processDropTable(NotificationEvent event) throws Exception { + SentryJSONDropTableMessage dropTableMessage = deserializer + .getDropTableMessage(event.getMessage()); + String dbName = dropTableMessage.getDB(); + String tableName = dropTableMessage.getTable(); + if ((dbName == null) || (tableName == null)) { + LOGGER.warn("Drop table event " + + "has incomplete information. dbName: {}, tableName: {}", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null")); + return false; + } + if (syncStoreOnDrop) { + dropSentryTablePrivileges(dbName, tableName, event); + } + + if (hdfsSyncEnabled) { + String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); + removeAllPaths(authzObj, event); + return true; + } + + return false; + } + + /** + * Processes "alter table" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param event notification event to be processed. + * @throws Exception if encounters errors while persisting the path change + */ + private boolean processAlterTable(NotificationEvent event) throws Exception { + + SentryJSONAlterTableMessage alterTableMessage = + deserializer.getAlterTableMessage(event.getMessage()); + String oldDbName = alterTableMessage.getDB(); + String oldTableName = alterTableMessage.getTable(); + String newDbName = event.getDbName(); + String newTableName = event.getTableName(); + String oldLocation = alterTableMessage.getOldLocation(); + String newLocation = alterTableMessage.getNewLocation(); + + if ((oldDbName == null) + || (oldTableName == null) + || (newDbName == null) + || (newTableName == null) + || (oldLocation == null) + || (newLocation == null)) { + LOGGER.warn(String.format("Alter table notification ignored since event " + + "has incomplete information. oldDbName = %s, oldTableName = %s, oldLocation = %s, " + + "newDbName = %s, newTableName = %s, newLocation = %s", + StringUtils.defaultIfBlank(oldDbName, "null"), + StringUtils.defaultIfBlank(oldTableName, "null"), + StringUtils.defaultIfBlank(oldLocation, "null"), + StringUtils.defaultIfBlank(newDbName, "null"), + StringUtils.defaultIfBlank(newTableName, "null"), + StringUtils.defaultIfBlank(newLocation, "null"))); + return false; + } + + if ((oldDbName.equals(newDbName)) + && (oldTableName.equals(newTableName)) + && (oldLocation.equals(newLocation))) { + LOGGER.debug(String.format("Alter table notification ignored as neither name nor " + + "location has changed: oldAuthzObj = %s, oldLocation = %s, newAuthzObj = %s, " + + "newLocation = %s", oldDbName + "." + oldTableName, oldLocation, + newDbName + "." + newTableName, newLocation)); + return false; + } + + if (!newDbName.equalsIgnoreCase(oldDbName) || !oldTableName.equalsIgnoreCase(newTableName)) { + // Name has changed + try { + renamePrivileges(oldDbName, oldTableName, newDbName, newTableName); + } catch (SentryNoSuchObjectException e) { + LOGGER.debug("Rename Sentry privilege ignored as there are no privileges on the table:" + + " {}.{}", oldDbName, oldTableName); + } catch (Exception e) { + LOGGER.info("Could not process Alter table event. Event: {}", event.toString(), e); + return false; + } + } + + if (!hdfsSyncEnabled) { + return false; + } + String oldAuthzObj = oldDbName + "." + oldTableName; + String newAuthzObj = newDbName + "." + newTableName; + renameAuthzPath(oldAuthzObj, newAuthzObj, oldLocation, newLocation, event); + return true; + } + + /** + * Processes "add partition" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param event notification event to be processed. + * @throws Exception if encounters errors while persisting the path change + */ + private boolean processAddPartition(NotificationEvent event) + throws Exception { + if (!hdfsSyncEnabled) { + return false; + } + + SentryJSONAddPartitionMessage addPartitionMessage = + deserializer.getAddPartitionMessage(event.getMessage()); + String dbName = addPartitionMessage.getDB(); + String tableName = addPartitionMessage.getTable(); + List<String> locations = addPartitionMessage.getLocations(); + if ((dbName == null) || (tableName == null) || (locations == null)) { + LOGGER.warn(String.format("Create table event has incomplete information. " + + "dbName = %s, tableName = %s, locations = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + locations != null ? locations.toString() : "null")); + return false; + } + String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); + addPaths(authzObj, locations, event); + return true; + } + + /** + * Processes "drop partition" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param event notification event to be processed. + * @throws Exception if encounters errors while persisting the path change + */ + private boolean processDropPartition(NotificationEvent event) + throws Exception { + if (!hdfsSyncEnabled) { + return false; + } + + SentryJSONDropPartitionMessage dropPartitionMessage = + deserializer.getDropPartitionMessage(event.getMessage()); + String dbName = dropPartitionMessage.getDB(); + String tableName = dropPartitionMessage.getTable(); + List<String> locations = dropPartitionMessage.getLocations(); + if ((dbName == null) || (tableName == null) || (locations == null)) { + LOGGER.warn(String.format("Drop partition event " + + "has incomplete information. dbName = %s, tableName = %s, location = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + locations != null ? locations.toString() : "null")); + return false; + } + String authzObj = SentryServiceUtil.getAuthzObj(dbName, tableName); + removePaths(authzObj, locations, event); + return true; + } + + /** + * Processes "alter partition" notification event, and applies its corresponding + * snapshot change as well as delta path update into Sentry DB. + * + * @param event notification event to be processed. + * @throws Exception if encounters errors while persisting the path change + */ + private boolean processAlterPartition(NotificationEvent event) throws Exception { + if (!hdfsSyncEnabled) { + return false; + } + + SentryJSONAlterPartitionMessage alterPartitionMessage = + deserializer.getAlterPartitionMessage(event.getMessage()); + String dbName = alterPartitionMessage.getDB(); + String tableName = alterPartitionMessage.getTable(); + String oldLocation = alterPartitionMessage.getOldLocation(); + String newLocation = alterPartitionMessage.getNewLocation(); + + if ((dbName == null) + || (tableName == null) + || (oldLocation == null) + || (newLocation == null)) { + LOGGER.warn(String.format("Alter partition event " + + "has incomplete information. dbName = %s, tableName = %s, " + + "oldLocation = %s, newLocation = %s", + StringUtils.defaultIfBlank(dbName, "null"), + StringUtils.defaultIfBlank(tableName, "null"), + StringUtils.defaultIfBlank(oldLocation, "null"), + StringUtils.defaultIfBlank(newLocation, "null"))); + return false; + } + + if (oldLocation.equals(newLocation)) { + LOGGER.debug(String.format("Alter partition notification ignored as" + + "location has not changed: AuthzObj = %s, Location = %s", dbName + "." + + "." + tableName, oldLocation)); + return false; + } + + String oldAuthzObj = dbName + "." + tableName; + renameAuthzPath(oldAuthzObj, oldAuthzObj, oldLocation, newLocation, event); + return true; + } + + /** + * Adds an authzObj along with a set of paths into the authzObj -> [Paths] mapping + * as well as persist the corresponding delta path change to Sentry DB. + * + * @param authzObj the given authzObj + * @param locations a set of paths need to be added + * @param event the NotificationEvent object from where authzObj and locations were obtained + */ + private void addPaths(String authzObj, Collection<String> locations, NotificationEvent event) + throws Exception { + // AuthzObj is case insensitive + authzObj = authzObj.toLowerCase(); + + UniquePathsUpdate update = new UniquePathsUpdate(event, false); + Collection<String> paths = new HashSet<>(locations.size()); + // addPath and persist into Sentry DB. + // Skip update if encounter malformed path. + for (String location : locations) { + String pathTree = getPath(location); + if (pathTree == null) { + LOGGER.debug("HMS Path Update [" + + "OP : addPath, " + + "authzObj : " + authzObj + ", " + + "path : " + location + "] - nothing to add" + ", " + + "notification event ID: " + event.getEventId() + "]"); + } else { + LOGGER.debug("HMS Path Update [" + + "OP : addPath, " + "authzObj : " + + authzObj + ", " + + "path : " + location + ", " + + "notification event ID: " + event.getEventId() + "]"); + update.newPathChange(authzObj).addToAddPaths(splitPath(pathTree)); + paths.add(pathTree); + } + } + sentryStore.addAuthzPathsMapping(authzObj, paths, update); + } + + /** + * Removes a set of paths map to a given authzObj from the authzObj -> [Paths] mapping + * as well as persist the corresponding delta path change to Sentry DB. + * + * @param authzObj the given authzObj + * @param locations a set of paths need to be removed + * @param event the NotificationEvent object from where authzObj and locations were obtained + */ + private void removePaths(String authzObj, Collection<String> locations, NotificationEvent event) + throws Exception { + // AuthzObj is case insensitive + authzObj = authzObj.toLowerCase(); + + UniquePathsUpdate update = new UniquePathsUpdate(event, false); + Collection<String> paths = new HashSet<>(locations.size()); + for (String location : locations) { + String pathTree = getPath(location); + if (pathTree == null) { + LOGGER.debug("HMS Path Update [" + + "OP : removePath, " + + "authzObj : " + authzObj + ", " + + "path : " + location + "] - nothing to remove" + ", " + + "notification event ID: " + event.getEventId() + "]"); + } else { + LOGGER.debug("HMS Path Update [" + + "OP : removePath, " + + "authzObj : " + authzObj + ", " + + "path : " + location + ", " + + "notification event ID: " + event.getEventId() + "]"); + update.newPathChange(authzObj).addToDelPaths(splitPath(pathTree)); + paths.add(pathTree); + } + } + sentryStore.deleteAuthzPathsMapping(authzObj, paths, update); + } + + /** + * Removes a given authzObj and all paths belongs to it from the + * authzObj -> [Paths] mapping as well as persist the corresponding + * delta path change to Sentry DB. + * + * @param authzObj the given authzObj to be deleted + * @param event the NotificationEvent object from where authzObj and locations were obtained + */ + private void removeAllPaths(String authzObj, NotificationEvent event) + throws Exception { + // AuthzObj is case insensitive + authzObj = authzObj.toLowerCase(); + + LOGGER.debug("HMS Path Update [" + + "OP : removeAllPaths, " + + "authzObj : " + authzObj + ", " + + "notification event ID: " + event.getEventId() + "]"); + UniquePathsUpdate update = new UniquePathsUpdate(event, false); + update.newPathChange(authzObj).addToDelPaths( + Lists.newArrayList(PathsUpdate.ALL_PATHS)); + sentryStore.deleteAllAuthzPathsMapping(authzObj, update); + } + + /** + * Renames a given authzObj and alter the paths belongs to it from the + * authzObj -> [Paths] mapping as well as persist the corresponding + * delta path change to Sentry DB. + * + * @param oldAuthzObj the existing authzObj + * @param newAuthzObj the new name to be changed to + * @param oldLocation a existing path of the given authzObj + * @param newLocation a new path to be changed to + * @param event the NotificationEvent object from where authzObj and locations were obtained + */ + private void renameAuthzPath(String oldAuthzObj, String newAuthzObj, String oldLocation, + String newLocation, NotificationEvent event) throws Exception { + // AuthzObj is case insensitive + oldAuthzObj = oldAuthzObj.toLowerCase(); + newAuthzObj = newAuthzObj.toLowerCase(); + String oldPathTree = getPath(oldLocation); + String newPathTree = getPath(newLocation); + + LOGGER.debug("HMS Path Update [" + + "OP : renameAuthzObject, " + + "oldAuthzObj : " + oldAuthzObj + ", " + + "newAuthzObj : " + newAuthzObj + ", " + + "oldLocation : " + oldLocation + ", " + + "newLocation : " + newLocation + ", " + + "notification event ID: " + event.getEventId() + "]"); + + // In the case of HiveObj name has changed + if (!oldAuthzObj.equalsIgnoreCase(newAuthzObj)) { + // Skip update if encounter malformed path for both oldLocation and newLocation. + if ((oldPathTree != null) && (newPathTree != null)) { + UniquePathsUpdate update = new UniquePathsUpdate(event, false); + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); + update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); + if (oldLocation.equals(newLocation)) { + //Only name has changed + // - Alter table rename for an external table + sentryStore.renameAuthzObj(oldAuthzObj, newAuthzObj, update); + } else { + // Both name and location has changed + // - Alter table rename for managed table + sentryStore.renameAuthzPathsMapping(oldAuthzObj, newAuthzObj, oldPathTree, + newPathTree, update); + } + } else { + updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree, event); + } + } else if (!oldLocation.equals(newLocation)) { + // Only Location has changed, e.g. Alter table set location + if ((oldPathTree != null) && (newPathTree != null)) { + UniquePathsUpdate update = new UniquePathsUpdate(event, false); + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); + update.newPathChange(oldAuthzObj).addToAddPaths(splitPath(newPathTree)); + sentryStore.updateAuthzPathsMapping(oldAuthzObj, oldPathTree, + newPathTree, update); + } else { + updateAuthzPathsMapping(oldAuthzObj, oldPathTree, newAuthzObj, newPathTree,event); + } + } else { + // This code should not be hit as appropriate checks are performed by the callers of this method. + LOGGER.error("Update Notification for Auhorizable object {}, with no change, skipping", + oldAuthzObj); + throw new SentryInvalidHMSEventException("Update Notification for Authorizable object" + + "with no change"); + } + } + + private void updateAuthzPathsMapping(String oldAuthzObj, String oldPathTree, + String newAuthzObj, String newPathTree, NotificationEvent event) throws Exception { + if (oldPathTree != null) { + UniquePathsUpdate update = new UniquePathsUpdate(event, false); + update.newPathChange(oldAuthzObj).addToDelPaths(splitPath(oldPathTree)); + sentryStore.deleteAuthzPathsMapping(oldAuthzObj, + Collections.singleton(oldPathTree), + update); + } else if (newPathTree != null) { + UniquePathsUpdate update = new UniquePathsUpdate(event, false); + update.newPathChange(newAuthzObj).addToAddPaths(splitPath(newPathTree)); + sentryStore.addAuthzPathsMapping(newAuthzObj, + Collections.singleton(newPathTree), + update); + } + + } + + /** + * Get path tree from a given path. It return null if encounters + * SentryMalformedPathException which indicates a malformed path. + * + * @param path a path + * @return the path tree given a path. + */ + private String getPath(String path) { + try { + return PathsUpdate.parsePath(path); + } catch (SentryMalformedPathException e) { + LOGGER.error("Unexpected path while parsing {}", path, e); + } + return null; + } + + private void dropSentryDbPrivileges(String dbName, NotificationEvent event) { + try { + TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName); + authorizable.setDb(dbName); + sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable)); + } catch (SentryNoSuchObjectException e) { + LOGGER.debug("Drop Sentry privilege ignored as there are no privileges on the database: {}", + dbName); + } catch (Exception e) { + LOGGER.error("Could not process Drop database event." + "Event: " + event.toString(), e); + } + } + + private void dropSentryTablePrivileges(String dbName, String tableName, + NotificationEvent event) { + try { + TSentryAuthorizable authorizable = new TSentryAuthorizable(authServerName); + authorizable.setDb(dbName); + authorizable.setTable(tableName); + sentryStore.dropPrivilege(authorizable, getPermUpdatableOnDrop(authorizable)); + } catch (SentryNoSuchObjectException e) { + LOGGER.debug("Drop Sentry privilege ignored as there are no privileges on the table: {}.{}", + dbName, tableName); + } catch (Exception e) { + LOGGER.error("Could not process Drop table event. Event: " + event.toString(), e); + } + } + + private void renamePrivileges(String oldDbName, String oldTableName, String newDbName, + String newTableName) throws + Exception { + TSentryAuthorizable oldAuthorizable = new TSentryAuthorizable(authServerName); + oldAuthorizable.setDb(oldDbName); + oldAuthorizable.setTable(oldTableName); + TSentryAuthorizable newAuthorizable = new TSentryAuthorizable(authServerName); + newAuthorizable.setDb(newDbName); + newAuthorizable.setTable(newTableName); + Update update = + getPermUpdatableOnRename(oldAuthorizable, newAuthorizable); + sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update); + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/7db84b2f/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java new file mode 100644 index 0000000..9813a5a --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import java.util.Collection; +import java.util.Map; + +/** + * A container for complete hive paths snapshot. + * <p> + * It is composed by a hiveObj to Paths mapping, a paths image ID and the sequence number/change ID + * of latest delta change that the snapshot maps to. + */ +public class PathsImage { + + // A full image of hiveObj to Paths mapping. + private final Map<String, Collection<String>> pathImage; + private final long id; + private final long curImgNum; + + public PathsImage(Map<String, Collection<String>> pathImage, long id, long curImgNum) { + this.pathImage = pathImage; + this.id = id; + this.curImgNum = curImgNum; + } + + public long getId() { + return id; + } + + public long getCurImgNum() { + return curImgNum; + } + + public Map<String, Collection<String>> getPathImage() { + return pathImage; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/7db84b2f/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java new file mode 100644 index 0000000..4a02db2 --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/PermissionsImage.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import org.apache.sentry.hdfs.service.thrift.TPrivilegeEntity; + +import java.util.List; +import java.util.Map; + +/** + * A container for complete sentry permission snapshot. + * <p> + * It is composed by a role to groups mapping, and hiveObj to < role/user, privileges > mapping. + * It also has the sequence number/change ID of latest delta change that the snapshot maps to. + */ +public class PermissionsImage { + + // A full snapshot of sentry role to groups mapping. + private final Map<String, List<String>> roleImage; + + // A full snapshot of hiveObj to <role/user, privileges> mapping. + private final Map<String, Map<TPrivilegeEntity, String>> privilegeImage; + private final long curSeqNum; + + public PermissionsImage(Map<String, List<String>> roleImage, + Map<String, Map<TPrivilegeEntity, String>> privilegeImage, long curSeqNum) { + this.roleImage = roleImage; + this.privilegeImage = privilegeImage; + this.curSeqNum = curSeqNum; + } + + public long getCurSeqNum() { + return curSeqNum; + } + + public Map<String, Map<TPrivilegeEntity, String>> getPrivilegeImage() { + return privilegeImage; + } + + public Map<String, List<String>> getRoleImage() { + return roleImage; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/7db84b2f/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/QueryParamBuilder.java ---------------------------------------------------------------------- diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/QueryParamBuilder.java b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/QueryParamBuilder.java new file mode 100644 index 0000000..6075e3f --- /dev/null +++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/provider/db/service/persistent/QueryParamBuilder.java @@ -0,0 +1,429 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.provider.db.service.persistent; + +import com.google.common.base.Joiner; +import org.apache.sentry.provider.db.service.model.MSentryRole; +import org.apache.sentry.provider.db.service.model.MSentryUser; + +import javax.annotation.concurrent.NotThreadSafe; +import javax.jdo.Query; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + * The QueryParamBuilder provides mechanism for constructing complex JDOQL queries with parameters. + * <p> + * There are many places where we want to construct a non-trivial parametrized query, + * often with sub-queries. A simple query expression is just a list of conditions joined + * by operation (either && or ||). More complicated query may contain sub-expressions which may contain + * further sub-expressions, for example {@code (AA && (B | (C && D)))}. + * <p> + * Query may contain parameters, which usually come from external sources (e.g. Thrift requests). For example, + * to search for a record containing specific database name we can use something like + * {@code "dbName == " + request.getDbName()}. This opens a possibility JDOQL injection with carefully + * constructed requests. To avoid this we need to parameterize the query into something like + * {@code "dbName == :dbMame"} and pass {@code dbName} as a query parameter. (The colon in {@code :dbName} + * tells Datanucleus to automatically figure out the type of the {@code dbName} parameter). We collect all + * such parameters in a map and use {@code executeWithMap()} method of the query to pass all collected + * parameters to a query. + * <h2>Representing the query and sub-queries</h2> + * + * A query is represented as + * <ul> + * <li>Top-level query operation which is always && or ||</li> + * <li>List of strings that constitute simple subexpressions which are joined with the top level + * operator when the final query string is constructed</li> + * <li>List of sub-expressions. Each sub-expression is another QueryParamBuilder that shares + * the parameter map with the parent. Usually the top-level operation of the sub-expression + * is the inverse of the top-level operation if the parent. Since + * {@code (A && (B && C))} can be simplified as {@code (A && B && C)}, there is no + * need for parent and child to have the same top-level operation. + * Children are added using the {@link #newChild()} method.</li> + * </ul> + * + * <h2>Constructing the string representation of a query</h2> + * + * Once the query is fully constructed, we can get its string reopresentation suitable for the + * {@code setFilter()} method of {@link javax.jdo.Query} by using {@link #toString()} method + * which does the following: + * <ul> + * <li>Combines all accumulated simple subexpressions using the top-level operator</li> + * <li>Recursively combines children QueryParamBuilder objects using their {@link #toString()} + * methods</li> + * <li>Joins result with the top-level operation</li> + * </ul> + * <em>NOTE that we do not guarantee specific order of expressions within each level</em>. + * <p> + * The class also provides useful common methods for checking that some field is or + * isn't <em>NULL</em> and method <em>addSet()</em> to add dynamic set of key/values<p> + * + * Most class methods return <em>this</em> so it is possible to chain calls together. + * <p> + * + * The class is not thread-safe. + * <p> + * Examples: + * <ol> + * <li> + * <pre>{@code + * QueryParamBuilder p = newQueryParamBuilder(); + * p.add("key1", "val1").add("key2", "val2") + * + * // Returns "(this.key1 == :key1 && this.key2 == :key2)" + * String queryStr = p.toString(); + * + * // Returns map {"key1": "val1", "key2": "val2"} + * Map<String, Object> args = p.getArguments(); + * + * Query query = pm.newQuery(Foo.class); + * query.setFilter(queryStr); + * query.executeWIthMap(args); + * }</pre> + * </li> + * <li> + * <pre>{@code + * QueryParamBuilder p = newQueryParamBuilder(); + * p.add("key1", "val1").add("key2", "val2") + * .newChild() // Inverts logical op from && to || + * .add("key3", "val3") + * .add("key4", "val4"). + * + * // Returns "(this.key1 == :val1 && this.key2 == :val2 && \ + * // (this.key3 == :val4 || this.key4 == val4))" + * String queryStr1 = p.toString() + * }</pre> + * </li> + * </ol> + * + * @see <a href="http://www.datanucleus.org/products/datanucleus/jdo/jdoql.html">Datanucleus JDOQL</a> + */ +@NotThreadSafe +public class QueryParamBuilder { + + /** + * Representation of the top-level query operator. + * Query is built by joining all parts with the specified Op. + */ + enum Op { + AND(" && "), + OR(" || "); + + public String toString() { + return value; + } + + private final String value; + + /** Constructor from string */ + Op(String val) { + this.value = val; + } + } + + // Query parts that will be joined with Op + private final List<String> queryParts = new LinkedList<>(); + // List of children - allocated lazily when children are added + private List<QueryParamBuilder> children; + // Query Parameters + private final Map<String, Object> arguments; + // paramId is used for automatically generating variable names + private final AtomicLong paramId; + // Join operation + private final String operation; + + /** + * Create new {@link QueryParamBuilder} + * @return the default {@link QueryParamBuilder} with && top-level operation. + */ + public static QueryParamBuilder newQueryParamBuilder() { + return new QueryParamBuilder(); + } + + /** + * Create new {@link QueryParamBuilder} with specific top-level operator + * @param operation top-level operation for subexpressions + * @return {@link QueryParamBuilder} with specified top-level operator + */ + public static QueryParamBuilder newQueryParamBuilder(Op operation) { + return new QueryParamBuilder(operation); + } + + /** + * Create a new child builder and attach to this one. Child's join operation is the + * inverse of the parent + * @return new child of the QueryBuilder + */ + public QueryParamBuilder newChild() { + // Reverse operation of this builder + Op operation = this.operation.equals(Op.AND.toString()) ? Op.OR : Op.AND; + return this.newChild(operation); + } + + /** + * Create a new child builder attached to this one + * @param operation - join operation + * @return new child of the QueryBuilder + */ + private QueryParamBuilder newChild(Op operation) { + QueryParamBuilder child = new QueryParamBuilder(this, operation); + if (children == null) { + children = new LinkedList<>(); + } + children.add(child); + return child; + } + + /** + * Get query arguments + * @return query arguments as a map of arg name and arg value suitable for + * query.executeWithMap + */ + public Map<String, Object> getArguments() { + return arguments; + } + + /** + * Get query string - reconstructs the query string from all the parts and children. + * @return Query string which can be matched with arguments to execute a query. + */ + @Override + public String toString() { + if (children == null && queryParts.isEmpty()) { + return ""; + } + if (children == null) { + return "(" + Joiner.on(operation).join(queryParts) + ")"; + } + // Concatenate our query parts with all children + List<String> result = new LinkedList<>(queryParts); + for (Object child: children) { + result.add(child.toString()); + } + return "(" + Joiner.on(operation).join(result) + ")"; + } + + /** + * Add parameter for field fieldName with given value where value is any Object + * @param fieldName Field name to query for + * @param value Field value (can be any Object) + */ + public QueryParamBuilder addObject(String fieldName, Object value) { + return addCustomParam("this." + fieldName + " == :" + fieldName, + fieldName, value); + } + + /** + * Add string parameter for field fieldName + * @param fieldName name of the field + * @param value String value. Value is normalized - converted to lower case and trimmed + * @return this + */ + public QueryParamBuilder add(String fieldName, String value) { + return addCommon(fieldName, value, false); + } + + /** + * Add string parameter to field value with or without normalization + * @param fieldName field name of the field + * @param value String value, inserted as is if preserveCase is true, normalized otherwise + * @param preserveCase if true, trm and lowercase the value. + * @return this + */ + public QueryParamBuilder add(String fieldName, String value, boolean preserveCase) { + return addCommon(fieldName, value, preserveCase); + } + + /** + * Add condition that fieldName is not equal to NULL + * @param fieldName field name to compare to NULL + * @return this + */ + public QueryParamBuilder addNotNull(String fieldName) { + queryParts.add(String.format("this.%s != \"%s\"", fieldName, SentryStore.NULL_COL)); + return this; + } + + /** + * Add condition that fieldName is equal to NULL + * @param fieldName field name to compare to NULL + * @return this + */ + public QueryParamBuilder addNull(String fieldName) { + queryParts.add(String.format("this.%s == \"%s\"", fieldName, SentryStore.NULL_COL)); + return this; + } + + /** + * Add custom string for evaluation together with a single parameter. + * This is used in cases where we need expression different from this.name == value + * @param expr String expression containing ':< paramName>' somewhere + * @param paramName parameter name + * @param value parameter value + * @return this + */ + QueryParamBuilder addCustomParam(String expr, String paramName, Object value) { + arguments.put(paramName, value); + queryParts.add(expr); + return this; + } + + /** + * Add arbitrary query string without parameters + * @param expr String expression + * @return this + */ + QueryParamBuilder addString(String expr) { + queryParts.add(expr); + return this; + } + + /** + * Add common filter for set of Sentry roles. This is used to simplify creating filters for + * privileges belonging to the specified set of roles. + * @param query Query used for search + * @param paramBuilder paramBuilder for parameters + * @param roleNames set of role names + * @return paramBuilder supplied or a new one if the supplied one is null. + */ + public static QueryParamBuilder addRolesFilter(Query query, QueryParamBuilder paramBuilder, + Set<String> roleNames) { + query.declareVariables(MSentryRole.class.getName() + " role"); + if (paramBuilder == null) { + paramBuilder = new QueryParamBuilder(); + } + if (roleNames == null || roleNames.isEmpty()) { + return paramBuilder; + } + paramBuilder.newChild().addSet("role.roleName == ", roleNames); + paramBuilder.addString("roles.contains(role)"); + return paramBuilder; + } + + /** + * Add common filter for set of Sentry users. This is used to simplify creating filters for + * privileges belonging to the specified set of users. + * @param query Query used for search + * @param paramBuilder paramBuilder for parameters + * @param userNames set of user names + * @return paramBuilder supplied or a new one if the supplied one is null. + */ + public static QueryParamBuilder addUsersFilter(Query query, QueryParamBuilder paramBuilder, + Set<String> userNames) { + query.declareVariables(MSentryUser.class.getName() + " user"); + if (paramBuilder == null) { + paramBuilder = new QueryParamBuilder(); + } + if (userNames == null || userNames.isEmpty()) { + return paramBuilder; + } + paramBuilder.newChild().addSet("user.userName == ", userNames); + paramBuilder.addString("users.contains(user)"); + return paramBuilder; + } + + /** + * Add multiple conditions for set of values. + * <p> + * Example: + * <pre> + * Set<String>names = new HashSet<>(); + * names.add("foo"); + * names.add("bar"); + * names.add("bob"); + * paramBuilder.addSet("prefix == ", names); + * // Expect:"(prefix == :var0 && prefix == :var1 && prefix == :var2)" + * paramBuilder.toString()); + * // paramBuilder(getArguments()) contains mapping for var0, var1 and var2 + * </pre> + * @param prefix common prefix to use for expression + * @param values + * @return this + */ + QueryParamBuilder addSet(String prefix, Iterable<String> values) { + if (values == null) { + return this; + } + + // Add expressions of the form 'prefix :var$i' + for(String name: values) { + // Append index to the varName + String vName = "var" + paramId.toString(); + addCustomParam(prefix + ":" + vName, vName, name.trim().toLowerCase()); + paramId.incrementAndGet(); + } + return this; + } + + /** + * Construct a default QueryParamBuilder joining arguments with && + */ + private QueryParamBuilder() { + this(Op.AND); + } + + /** + * Construct generic QueryParamBuilder + * @param operation join operation (AND or OR) + */ + private QueryParamBuilder(Op operation) { + this.arguments = new HashMap<>(); + this.operation = operation.toString(); + this.paramId = new AtomicLong(0); + } + + /** + * Internal constructor used for children - reuses arguments from parent + * @param parent parent element + * @param operation join operation + */ + private QueryParamBuilder(QueryParamBuilder parent, Op operation) { + this.arguments = parent.getArguments(); + this.operation = operation.toString(); + this.paramId = parent.paramId; + } + + /** + * common code for adding string values + * @param fieldName field name to add + * @param value field value + * @param preserveCase if true, do not trim and lower + * @return this + * @throws IllegalArgumentException if fieldName was already added before + */ + private QueryParamBuilder addCommon(String fieldName, String value, + boolean preserveCase) { + Object oldValue; + if (preserveCase) { + oldValue = arguments.put(fieldName, SentryStore.toNULLCol(SentryStore.safeTrim(value))); + } else { + oldValue = arguments.put(fieldName, SentryStore.toNULLCol(SentryStore.safeTrimLower(value))); + } + if (oldValue != null) { + // Attempt to insert the same field twice + throw new IllegalArgumentException("field " + fieldName + "already exists"); + } + queryParts.add("this." + fieldName + " == :" + fieldName); + return this; + } +}
