SENTRY-1613: Add propagating logic for Perm/Path updates in Sentry service (Hao Hao, Reviewed by: Alexander Kolbasov and Lei Xu)
Change-Id: I1223a45df8ab1c169772b2ffe92762f0dcc4e82e Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/2811311e Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/2811311e Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/2811311e Branch: refs/heads/sentry-ha-redesign Commit: 2811311ea6dfe2e26af67a545333486ca0e89092 Parents: 268ee50 Author: hahao <[email protected]> Authored: Thu Mar 23 17:51:15 2017 -0700 Committer: hahao <[email protected]> Committed: Fri Mar 24 12:51:39 2017 -0700 ---------------------------------------------------------------------- .../org/apache/sentry/hdfs/DeltaRetriever.java | 67 ++++ .../org/apache/sentry/hdfs/ImageRetriever.java | 7 +- .../apache/sentry/hdfs/ThriftSerializer.java | 10 +- .../apache/sentry/hdfs/DBUpdateForwarder.java | 88 ++++ .../org/apache/sentry/hdfs/MetastorePlugin.java | 397 ------------------- .../apache/sentry/hdfs/PathDeltaRetriever.java | 76 ++++ .../apache/sentry/hdfs/PathImageRetriever.java | 26 +- .../apache/sentry/hdfs/PermDeltaRetriever.java | 76 ++++ .../apache/sentry/hdfs/PermImageRetriever.java | 14 +- .../sentry/hdfs/SentryHDFSServiceProcessor.java | 29 +- .../sentry/hdfs/SentryHdfsMetricsUtil.java | 19 - .../org/apache/sentry/hdfs/SentryPlugin.java | 168 +++----- .../org/apache/sentry/hdfs/UpdateForwarder.java | 335 ---------------- .../sentry/hdfs/UpdateablePermissions.java | 63 --- .../apache/sentry/hdfs/TestUpdateForwarder.java | 359 ----------------- .../db/service/persistent/SentryStore.java | 176 +++++++- .../sentry/service/thrift/HMSFollower.java | 2 - .../db/service/persistent/TestSentryStore.java | 13 +- .../tests/e2e/hdfs/TestHDFSIntegration.java | 2 +- .../e2e/hdfs/TestHDFSIntegrationAdvanced.java | 1 + .../tests/e2e/hdfs/TestHDFSIntegrationBase.java | 3 +- .../e2e/hdfs/TestHDFSIntegrationEnd2End.java | 1 + 22 files changed, 572 insertions(+), 1360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java new file mode 100644 index 0000000..0e58593 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/DeltaRetriever.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.hdfs; + +import java.util.Collection; + +import static org.apache.sentry.hdfs.Updateable.Update; + +/** + * DeltaRetriever obtains a delta update of either Sentry Permissions or Sentry + * representation of HMS Paths. + * <p> + * Sentry permissions are represented as {@link PermissionsUpdate} and HMS Paths + * are represented as {@link PathsUpdate}. The delta update contains change + * from a state to another. + * The {@link #retrieveDelta(long)} method obtains such delta update from a persistent storage. + * Delta update is propagated to a consumer of Sentry, such as HDFS NameNode whenever + * the consumer needs to synchronize the update. + */ +public interface DeltaRetriever<K extends Update> { + + /** + * Retrieves all delta updates of type {@link Update} newer than or equal with + * the given sequence number/change ID (inclusive) from a persistent storage. + * An empty collection can be returned. + * + * @param seqNum the given seq number + * @return a collect of delta updates of type K + * @throws Exception when there is an error in operation on persistent storage + */ + Collection<K> retrieveDelta(long seqNum) throws Exception; + + /** + * Checks if there the delta update is available, given the sequence number/change + * ID, from a persistent storage. + * + * @param seqNum the given seq number + * @return true if there are such delta updates available. + * Otherwise it will be false. + * @throws Exception when there is an error in operation on persistent storage + */ + boolean isDeltaAvailable(long seqNum) throws Exception; + + /** + * Gets the latest updated delta ID. + * + * @return the latest updated delta ID. + * @throws Exception when there is an error in operation on persistent storage + */ + long getLatestDeltaID() throws Exception; +} http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java index 0e40756..e96140d 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java @@ -25,7 +25,7 @@ import static org.apache.sentry.hdfs.Updateable.Update; * ({@code PathsUpdate}). * <p> * The snapshot image should represent a consistent state. - * The {@link #retrieveFullImage(long)} method obtains such state snapshot from + * The {@link #retrieveFullImage()} method obtains such state snapshot from * a persistent storage. * The Snapshots are propagated to a consumer of Sentry, such as HDFS NameNode, * whenever the consumer needs to synchronize its full state. @@ -33,13 +33,12 @@ import static org.apache.sentry.hdfs.Updateable.Update; public interface ImageRetriever<K extends Update> { /** - * Retrieve a complete snapshot of type {@code Update} from a persistent storage. + * Retrieves a complete snapshot of type {@code Update} from a persistent storage. * - * @param seqNum * @return a complete snapshot of type {@link Update}, e.g {@link PermissionsUpdate} * or {@link PathsUpdate} * @throws Exception */ - K retrieveFullImage(long seqNum) throws Exception; + K retrieveFullImage() throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java index 69aa098..d7b9923 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java @@ -25,12 +25,12 @@ import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TSimpleJSONProtocol; +import org.apache.thrift.protocol.TJSONProtocol; public class ThriftSerializer { - final static private TSimpleJSONProtocol.Factory tSimpleJSONProtocol = - new TSimpleJSONProtocol.Factory(); + final static private TJSONProtocol.Factory tJSONProtocol = + new TJSONProtocol.Factory(); // Use default max thrift message size here. // TODO: Figure out a way to make maxMessageSize configurable, eg. create a serializer singleton at startup by @@ -67,13 +67,13 @@ public class ThriftSerializer { public static String serializeToJSON(TBase base) throws TException { // Initiate a new TSerializer each time for thread safety. - TSerializer tSerializer = new TSerializer(tSimpleJSONProtocol); + TSerializer tSerializer = new TSerializer(tJSONProtocol); return tSerializer.toString(base); } public static void deserializeFromJSON(TBase base, String dataInJson) throws TException { // Initiate a new TDeserializer each time for thread safety. - TDeserializer tDeserializer = new TDeserializer(tSimpleJSONProtocol); + TDeserializer tDeserializer = new TDeserializer(tJSONProtocol); tDeserializer.fromString(base, dataInJson); } http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java new file mode 100644 index 0000000..b8542b3 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * DBUpdateForwarder propagates a complete snapshot or delta update of either + * Sentry Permissions ({@code PermissionsUpdate}) or Sentry representation of + * HMS Paths ({@code PathsUpdate}), retrieved from a persistent storage, to a + * Sentry client, e.g HDFS NameNode. + * <p> + * It is a thread safe class, as all the underlying database operation is thread safe. + */ +@ThreadSafe +class DBUpdateForwarder<K extends Updateable.Update> { + + private final ImageRetriever<K> imageRetriever; + private final DeltaRetriever<K> deltaRetriever; + private static final Logger LOGGER = LoggerFactory.getLogger(DBUpdateForwarder.class); + + DBUpdateForwarder(final ImageRetriever<K> imageRetriever, + final DeltaRetriever<K> deltaRetriever) { + this.imageRetriever = imageRetriever; + this.deltaRetriever = deltaRetriever; + } + + /** + * Retrieves all delta updates from the requested sequence number (inclusive) from + * a persistent storage. + * It first checks if there is such newer deltas exists in the persistent storage. + * If there is, returns a list of delta updates. + * Otherwise, a complete snapshot will be returned. + * + * @param seqNum the requested sequence number + * @return a list of delta updates, e.g. {@link PathsUpdate} or {@link PermissionsUpdate} + */ + List<K> getAllUpdatesFrom(long seqNum) throws Exception { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("#### GetAllUpdatesFrom [reqSeqNum = {} ]", seqNum); + } + + // No newer updates available than the requested one. + long curSeqNum = deltaRetriever.getLatestDeltaID(); + if (seqNum > curSeqNum) { + return Collections.emptyList(); + } + + // Checks if there is newer deltas exists in the persistent storage. + // If there is, returns a list of delta updates. + if ((seqNum != SentryStore.INIT_CHANGE_ID) && + deltaRetriever.isDeltaAvailable(seqNum)) { + Collection<K> deltas = deltaRetriever.retrieveDelta(seqNum); + if (!deltas.isEmpty()) { + return new LinkedList<>(deltas); + } + } + + // Otherwise, a complete snapshot will be returned. + List<K> retVal = new LinkedList<>(); + retVal.add(imageRetriever.retrieveFullImage()); + return retVal; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java deleted file mode 100644 index 16ffa1b..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/MetastorePlugin.java +++ /dev/null @@ -1,397 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.hdfs; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler; -import org.apache.hadoop.hive.metastore.MetaStorePreEventListener; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; -import org.apache.sentry.provider.db.SentryMetastoreListenerPlugin; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - * Plugin implementation of {@link SentryMetastoreListenerPlugin} that hooks - * into the sites in the {@link MetaStorePreEventListener} that deal with - * creation/updation and deletion for paths. - */ -public class MetastorePlugin extends SentryMetastoreListenerPlugin { - - private static final Logger LOGGER = LoggerFactory.getLogger(MetastorePlugin.class); - - private static final String initializationFailureMsg = "Cache failed to initialize, cannot send path updates to Sentry." + - " Please review HMS error logs during startup for additional information. If the initialization failure is due" + - " to SentryMalformedPathException, you will need to rectify the malformed path in HMS db and restart HMS"; - - class SyncTask implements Runnable { - @Override - public void run() { - if (!notificiationLock.tryLock()) { - // No need to sync.. as metastore is in the process of pushing an update.. - return; - } - if (MetastorePlugin.this.authzPaths == null) { - LOGGER.warn(initializationFailureMsg); - return; - } - try { - long lastSeenBySentry = - MetastorePlugin.this.getClient().getLastSeenHMSPathSeqNum(); - long lastSent = lastSentSeqNum; - if (lastSeenBySentry != lastSent) { - LOGGER.warn("#### Sentry not in sync with HMS [" + lastSeenBySentry + ", " - + lastSent + "]"); - PathsUpdate fullImageUpdate = - MetastorePlugin.this.authzPaths.createFullImageUpdate(lastSent); - notifySentryNoLock(fullImageUpdate); - LOGGER.warn("#### Synced Sentry with update [" + lastSent + "]"); - } - } catch (Exception e) { - sentryClient = null; - LOGGER.error("Error talking to Sentry HDFS Service !!", e); - } finally { - syncSent = true; - notificiationLock.unlock(); - } - } - } - - private final Configuration conf; - private SentryHDFSServiceClient sentryClient; - private volatile UpdateableAuthzPaths authzPaths; - private Lock notificiationLock; - - // Initialized to some value > 1. - protected static final AtomicLong seqNum = new AtomicLong(5); - - // Has to match the value of seqNum - protected static volatile long lastSentSeqNum = seqNum.get(); - private volatile boolean syncSent = false; - private volatile boolean initComplete = false; - private volatile boolean queueFlushComplete = false; - private volatile Throwable initError = null; - private final Queue<PathsUpdate> updateQueue = new LinkedList<PathsUpdate>(); - - private final ExecutorService threadPool; //NOPMD - private final Configuration sentryConf; - - static class ProxyHMSHandler extends HMSHandler { - public ProxyHMSHandler(String name, HiveConf conf) throws MetaException { - super(name, conf); - } - } - - public MetastorePlugin(Configuration conf, Configuration sentryConf) { - this.notificiationLock = new ReentrantLock(); - - if (!(conf instanceof HiveConf)) { - String error = "Configuration is not an instanceof HiveConf"; - LOGGER.error(error); - throw new RuntimeException(error); - } - this.conf = new HiveConf((HiveConf)conf); - - this.sentryConf = new Configuration(sentryConf); - this.conf.unset(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS.varname); - this.conf.unset(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname); - this.conf.unset(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS.varname); - this.conf.unset(HiveConf.ConfVars.METASTOREURIS.varname); - Thread initUpdater = new Thread() { - @Override - public void run() { - MetastoreCacheInitializer cacheInitializer = null; - try { - cacheInitializer = - new MetastoreCacheInitializer(new ProxyHMSHandler("sentry.hdfs", - (HiveConf) MetastorePlugin.this.conf), - MetastorePlugin.this.conf); - MetastorePlugin.this.authzPaths = - cacheInitializer.createInitialUpdate(); - LOGGER.info("#### Metastore Plugin initialization complete !!"); - synchronized (updateQueue) { - while (!updateQueue.isEmpty()) { - PathsUpdate update = updateQueue.poll(); - if (update != null) { - processUpdate(update); - } - } - queueFlushComplete = true; - } - LOGGER.info("#### Finished flushing queued updates to Sentry !!"); - } catch (Exception e) { - LOGGER.error("#### Could not create Initial AuthzPaths or HMSHandler !!", e); - initError = e; - } finally { - if (cacheInitializer != null) { - try { - cacheInitializer.close(); - } catch (Exception e) { - LOGGER.info("#### Exception while closing cacheInitializer !!", e); - } - } - initComplete = true; - } - } - }; - if (this.conf.getBoolean( - ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE, - ServerConfig - .SENTRY_HDFS_SYNC_METASTORE_CACHE_ASYNC_INIT_ENABLE_DEFAULT)) { - LOGGER.warn("#### Metastore Cache initialization is set to aync..." + - "HDFS ACL synchronization will not happen until metastore" + - "cache initialization is completed !!"); - initUpdater.start(); - } else { - initUpdater.run(); //NOPMD - } - try { - sentryClient = SentryHDFSServiceClientFactory.create(sentryConf); - } catch (Exception e) { - sentryClient = null; - LOGGER.error("Could not connect to Sentry HDFS Service !!", e); - } - ScheduledExecutorService newThreadPool = Executors.newScheduledThreadPool(1); - newThreadPool.scheduleWithFixedDelay(new SyncTask(), - this.conf.getLong(ServerConfig - .SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, - ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT), - this.conf.getLong(ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_MS, - ServerConfig.SENTRY_HDFS_SYNC_CHECKER_PERIOD_DEFAULT), - TimeUnit.MILLISECONDS); - this.threadPool = newThreadPool; - } - - @Override - public void addPath(String authzObj, String path) { - List<String> pathTree = null; - try { - pathTree = PathsUpdate.parsePath(path); - } catch (SentryMalformedPathException e) { - LOGGER.error("Unexpected path in addPath: authzObj = " + authzObj + " , path = " + path); - e.printStackTrace(); - return; - } - if(pathTree == null) { - return; - } - LOGGER.debug("#### HMS Path Update [" - + "OP : addPath, " - + "authzObj : " + authzObj.toLowerCase() + ", " - + "path : " + path + "]"); - PathsUpdate update = createHMSUpdate(); - update.newPathChange(authzObj.toLowerCase()).addToAddPaths(pathTree); - notifySentryAndApplyLocal(update); - } - - @Override - public void removeAllPaths(String authzObj, List<String> childObjects) { - LOGGER.debug("#### HMS Path Update [" - + "OP : removeAllPaths, " - + "authzObj : " + authzObj.toLowerCase() + ", " - + "childObjs : " + (childObjects == null ? "[]" : childObjects) + "]"); - PathsUpdate update = createHMSUpdate(); - if (childObjects != null) { - for (String childObj : childObjects) { - update.newPathChange(authzObj.toLowerCase() + "." + childObj).addToDelPaths( - Lists.newArrayList(PathsUpdate.ALL_PATHS)); - } - } - update.newPathChange(authzObj.toLowerCase()).addToDelPaths( - Lists.newArrayList(PathsUpdate.ALL_PATHS)); - notifySentryAndApplyLocal(update); - } - - @Override - public void removePath(String authzObj, String path) { - if ("*".equals(path)) { - removeAllPaths(authzObj.toLowerCase(), null); - } else { - List<String> pathTree = null; - try { - pathTree = PathsUpdate.parsePath(path); - } catch (SentryMalformedPathException e) { - LOGGER.error("Unexpected path in removePath: authzObj = " + authzObj + " , path = " + path); - e.printStackTrace(); - return; - } - if(pathTree == null) { - return; - } - LOGGER.debug("#### HMS Path Update [" - + "OP : removePath, " - + "authzObj : " + authzObj.toLowerCase() + ", " - + "path : " + path + "]"); - PathsUpdate update = createHMSUpdate(); - update.newPathChange(authzObj.toLowerCase()).addToDelPaths(pathTree); - notifySentryAndApplyLocal(update); - } - } - - @Override - public void renameAuthzObject(String oldName, String oldPath, String newName, - String newPath) { - String oldNameLC = oldName != null ? oldName.toLowerCase() : null; - String newNameLC = newName != null ? newName.toLowerCase() : null; - PathsUpdate update = createHMSUpdate(); - LOGGER.debug("#### HMS Path Update [" - + "OP : renameAuthzObject, " - + "oldName : " + oldNameLC + "," - + "oldPath : " + oldPath + "," - + "newName : " + newNameLC + "," - + "newPath : " + newPath + "]"); - List<String> newPathTree = null; - try { - newPathTree = PathsUpdate.parsePath(newPath); - } catch (SentryMalformedPathException e) { - LOGGER.error("Unexpected path in renameAuthzObject while parsing newPath: oldName=" + oldName + ", oldPath=" + oldPath + - ", newName=" + newName + ", newPath=" + newPath); - e.printStackTrace(); - return; - } - - if( newPathTree != null ) { - update.newPathChange(newNameLC).addToAddPaths(newPathTree); - } - List<String> oldPathTree = null; - try { - oldPathTree = PathsUpdate.parsePath(oldPath); - } catch (SentryMalformedPathException e) { - LOGGER.error("Unexpected path in renameAuthzObject while parsing oldPath: oldName=" + oldName + ", oldPath=" + oldPath + - ", newName=" + newName + ", newPath=" + newPath); - e.printStackTrace(); - return; - } - - if( oldPathTree != null ) { - update.newPathChange(oldNameLC).addToDelPaths(oldPathTree); - } - notifySentryAndApplyLocal(update); - } - - private SentryHDFSServiceClient getClient() { - if (sentryClient == null) { - try { - sentryClient = SentryHDFSServiceClientFactory.create(sentryConf); - } catch (Exception e) { - sentryClient = null; - LOGGER.error("Could not connect to Sentry HDFS Service !!", e); - } - } - return sentryClient; - } - - private PathsUpdate createHMSUpdate() { - PathsUpdate update = new PathsUpdate(seqNum.incrementAndGet(), false); - LOGGER.debug("#### Creating HMS Path Update SeqNum : [" + seqNum.get() + "]"); - return update; - } - - protected void notifySentryNoLock(PathsUpdate update) { - final Timer.Context timerContext = - SentryHdfsMetricsUtil.getNotifyHMSUpdateTimer.time(); - try { - getClient().notifyHMSUpdate(update); - } catch (Exception e) { - LOGGER.error("Could not send update to Sentry HDFS Service !!", e); - SentryHdfsMetricsUtil.getFailedNotifyHMSUpdateCounter.inc(); - } finally { - timerContext.stop(); - } - } - - protected void notifySentry(PathsUpdate update) { - notificiationLock.lock(); - try { - if (!syncSent) { - new SyncTask().run(); - } - - notifySentryNoLock(update); - } finally { - lastSentSeqNum = update.getSeqNum(); - notificiationLock.unlock(); - LOGGER.debug("#### HMS Path Last update sent : ["+ lastSentSeqNum + "]"); - } - } - - protected void applyLocal(PathsUpdate update) { - final Timer.Context timerContext = - SentryHdfsMetricsUtil.getApplyLocalUpdateTimer.time(); - if(authzPaths == null) { - LOGGER.error(initializationFailureMsg); - return; - } - authzPaths.updatePartial(Lists.newArrayList(update), new ReentrantReadWriteLock()); - timerContext.stop(); - SentryHdfsMetricsUtil.getApplyLocalUpdateHistogram.update( - update.getPathChanges().size()); - } - - private void notifySentryAndApplyLocal(PathsUpdate update) { - if(authzPaths == null) { - LOGGER.error(initializationFailureMsg); - return; - } - if (initComplete) { - processUpdate(update); - } else { - if (initError == null) { - synchronized (updateQueue) { - if (!queueFlushComplete) { - updateQueue.add(update); - } else { - processUpdate(update); - } - } - } else { - StringWriter sw = new StringWriter(); - initError.printStackTrace(new PrintWriter(sw)); - LOGGER.error("#### Error initializing Metastore Plugin" + - "[" + sw.toString() + "] !!"); - throw new RuntimeException(initError); - } - LOGGER.warn("#### Path update [" + update.getSeqNum() + "] not sent to Sentry.." + - "Metastore hasn't been initialized yet !!"); - } - } - - protected void processUpdate(PathsUpdate update) { - applyLocal(update); - notifySentry(update); - } - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java new file mode 100644 index 0000000..cea5b9d --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathDeltaRetriever.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import org.apache.sentry.provider.db.service.model.MSentryPathChange; +import org.apache.sentry.provider.db.service.persistent.SentryStore; + +import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * PathDeltaRetriever retrieves delta updates of Hive Paths from a persistent + * storage and translates them into a collection of {@code PathsUpdate} that the + * consumers, such as HDFS NameNode, can understand. + * <p> + * It is a thread safe class, as all the underlying database operation is thread safe. + */ +@ThreadSafe +public class PathDeltaRetriever implements DeltaRetriever<PathsUpdate> { + + private final SentryStore sentryStore; + + PathDeltaRetriever(SentryStore sentryStore) { + this.sentryStore = sentryStore; + } + + @Override + public Collection<PathsUpdate> retrieveDelta(long seqNum) throws Exception { + Collection<MSentryPathChange> mSentryPathChanges = + sentryStore.getMSentryPathChanges(seqNum); + if (mSentryPathChanges.isEmpty()) { + return Collections.emptyList(); + } + + Collection<PathsUpdate> updates = new ArrayList<>(mSentryPathChanges.size()); + for (MSentryPathChange mSentryPathChange : mSentryPathChanges) { + // Gets the changeID from the persisted MSentryPathChange. + long changeID = mSentryPathChange.getChangeID(); + // Creates a corresponding PathsUpdate and deserialize the + // persisted delta update in JSON format to TPathsUpdate with + // associated changeID. + PathsUpdate pathsUpdate = new PathsUpdate(); + pathsUpdate.JSONDeserialize(mSentryPathChange.getPathChange()); + pathsUpdate.setSeqNum(changeID); + updates.add(pathsUpdate); + } + return updates; + } + + @Override + public boolean isDeltaAvailable(long seqNum) throws Exception { + return sentryStore.pathChangeExists(seqNum); + } + + @Override + public long getLatestDeltaID() throws Exception { + return sentryStore.getLastProcessedPathChangeID(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java index 16a1604..0eaac80 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java @@ -18,28 +18,35 @@ package org.apache.sentry.hdfs; import com.codahale.metrics.Timer; +import com.google.common.collect.Lists; import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.apache.sentry.provider.db.service.persistent.PathsImage; import org.apache.sentry.provider.db.service.persistent.SentryStore; +import javax.annotation.concurrent.ThreadSafe; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * PathImageRetriever obtains a complete snapshot of Hive Paths from a persistent - * storage and translate it into {@code PathsUpdate} that the consumers, such as - * HDFS NameNod, can understand. + * storage and translates it into {@code PathsUpdate} that the consumers, such as + * HDFS NameNode, can understand. + * <p> + * It is a thread safe class, as all the underlying database operation is thread safe. */ +@ThreadSafe public class PathImageRetriever implements ImageRetriever<PathsUpdate> { private final SentryStore sentryStore; + private final static String[] root = {"/"}; PathImageRetriever(SentryStore sentryStore) { this.sentryStore = sentryStore; } @Override - public PathsUpdate retrieveFullImage(long seqNum) throws Exception { + public PathsUpdate retrieveFullImage() throws Exception { try (final Timer.Context timerContext = SentryHdfsMetricsUtil.getRetrievePathFullImageTimer.time()) { @@ -54,8 +61,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> { // Adds all <hiveObj, paths> mapping to be included in this paths update. // And label it with the latest delta change sequence number for consumer // to be aware of the next delta change it should continue with. - // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1613 - PathsUpdate pathsUpdate = new PathsUpdate(seqNum, true); + PathsUpdate pathsUpdate = new PathsUpdate(curSeqNum, true); for (Map.Entry<String, Set<String>> pathEnt : pathImage.entrySet()) { TPathChanges pathChange = pathsUpdate.newPathChange(pathEnt.getKey()); @@ -66,7 +72,15 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> { SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate .getPathChanges().size()); + + // Translate PathsUpdate that contains a full image to TPathsDump for + // consumer (NN) to be able to quickly construct UpdateableAuthzPaths + // from TPathsDump. + UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths(root); + authzPaths.updatePartial(Lists.newArrayList(pathsUpdate), + new ReentrantReadWriteLock()); + pathsUpdate.toThrift().setPathsDump(authzPaths.getPathsDump().createPathsDump()); return pathsUpdate; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java new file mode 100644 index 0000000..9649b02 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermDeltaRetriever.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import org.apache.sentry.provider.db.service.model.MSentryPermChange; +import org.apache.sentry.provider.db.service.persistent.SentryStore; + +import javax.annotation.concurrent.ThreadSafe; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * PermDeltaRetriever retrieves delta updates of Sentry permission from a persistent + * storage and translates it into a collection of {@code PermissionsUpdate} that the + * consumers, such as HDFS NameNode, can understand. + * <p> + * It is a thread safe class, as all the underlying database operation is thread safe. + */ +@ThreadSafe +public class PermDeltaRetriever implements DeltaRetriever<PermissionsUpdate> { + + private final SentryStore sentryStore; + + PermDeltaRetriever(SentryStore sentryStore) { + this.sentryStore = sentryStore; + } + + @Override + public Collection<PermissionsUpdate> retrieveDelta(long seqNum) throws Exception { + Collection<MSentryPermChange> mSentryPermChanges = + sentryStore.getMSentryPermChanges(seqNum); + if (mSentryPermChanges.isEmpty()) { + return Collections.emptyList(); + } + + Collection<PermissionsUpdate> updates = new ArrayList<>(mSentryPermChanges.size()); + for (MSentryPermChange mSentryPermChange : mSentryPermChanges) { + // Get the changeID from the persisted MSentryPermChange + long changeID = mSentryPermChange.getChangeID(); + // Create a corresponding PermissionsUpdate and deserialize the + // persisted delta update in JSON format to TPermissionsUpdate with + // associated changeID. + PermissionsUpdate permsUpdate = new PermissionsUpdate(); + permsUpdate.JSONDeserialize(mSentryPermChange.getPermChange()); + permsUpdate.setSeqNum(changeID); + updates.add(permsUpdate); + } + return updates; + } + + @Override + public boolean isDeltaAvailable(long seqNum) throws Exception { + return sentryStore.permChangeExists(seqNum); + } + + @Override + public long getLatestDeltaID() throws Exception { + return sentryStore.getLastProcessedPermChangeID(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java index 3017c9e..5964f17 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java @@ -24,6 +24,7 @@ import org.apache.sentry.hdfs.service.thrift.TRoleChanges; import org.apache.sentry.provider.db.service.persistent.PermissionsImage; import org.apache.sentry.provider.db.service.persistent.SentryStore; +import javax.annotation.concurrent.ThreadSafe; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -31,9 +32,12 @@ import java.util.Map; /** * PermImageRetriever obtains a complete snapshot of Sentry permission from a persistent - * storage and translate it into {@code PermissionsUpdate} that the consumers, such as - * HDFS NameNod, can understand. + * storage and translates it into {@code PermissionsUpdate} that the consumers, such as + * HDFS NameNode, can understand. + * <p> + * It is a thread safe class, as all the underlying database operation is thread safe. */ +@ThreadSafe public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> { private final SentryStore sentryStore; @@ -43,7 +47,7 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> { } @Override - public PermissionsUpdate retrieveFullImage(long seqNum) throws Exception { + public PermissionsUpdate retrieveFullImage() throws Exception { try(Timer.Context timerContext = SentryHdfsMetricsUtil.getRetrievePermFullImageTimer.time()) { @@ -80,8 +84,6 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> { } PermissionsUpdate permissionsUpdate = new PermissionsUpdate(tPermUpdate); - // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1567 - permissionsUpdate.setSeqNum(seqNum); SentryHdfsMetricsUtil.getPrivilegeChangesHistogram.update( tPermUpdate.getPrivilegeChangesSize()); SentryHdfsMetricsUtil.getRoleChangesHistogram.update( @@ -90,4 +92,4 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java index e4f3f58..395618a 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java @@ -42,10 +42,6 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>()); retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>()); if (SentryPlugin.instance != null) { - if (SentryPlugin.instance.isOutOfSync()) { - throw new TException( - "This Sentry server is not communicating with other nodes and out of sync "); - } final Timer.Context timerContext = SentryHdfsMetricsUtil.getAllAuthzUpdatesTimer.time(); try { @@ -99,33 +95,12 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { @Override public void handle_hms_notification(TPathsUpdate update) throws TException { - final Timer.Context timerContext = - SentryHdfsMetricsUtil.getHandleHmsNotificationTimer.time(); - try { - PathsUpdate hmsUpdate = new PathsUpdate(update); - if (SentryPlugin.instance != null) { - SentryPlugin.instance.handlePathUpdateNotification(hmsUpdate); - LOGGER.debug("Authz Paths update [" + hmsUpdate.getSeqNum() + "].."); - } else { - LOGGER.error("SentryPlugin not initialized yet !!"); - } - } catch (Exception e) { - LOGGER.error("Error handling notification from HMS", e); - SentryHdfsMetricsUtil.getFailedHandleHmsNotificationCounter.inc(); - throw new TException(e); - } finally { - timerContext.stop(); - SentryHdfsMetricsUtil.getHandleHmsPathChangeHistogram.update( - update.getPathChangesSize()); - if (update.isHasFullImage()) { - SentryHdfsMetricsUtil.getHandleHmsHasFullImageCounter.inc(); - } - } + throw new UnsupportedOperationException("handle_hms_notification"); } @Override public long check_hms_seq_num(long pathSeqNum) throws TException { - return SentryPlugin.instance.getLastSeenHMSPathSeqNum(); + throw new UnsupportedOperationException("check_hms_seq_num"); } /** http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java index be14569..28bf20e 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHdfsMetricsUtil.java @@ -83,25 +83,6 @@ public class SentryHdfsMetricsUtil { MetricRegistry.name(PathImageRetriever.class, "retrieve-path-full-image", "path-changes-size")); - - // Metrics for notifySentry HMS update in MetaStorePlugin - // The timer used for each notifySentry - public static final Timer getNotifyHMSUpdateTimer = sentryMetrics.getTimer( - MetricRegistry.name(MetastorePlugin.class, "notify-sentry-HMS-update")); - // The number of failed notifySentry - public static final Counter getFailedNotifyHMSUpdateCounter = sentryMetrics.getCounter( - MetricRegistry.name(MetastorePlugin.class, "notify-sentry-HMS-update", - "failed-num")); - - // Metrics for applyLocal update in MetastorePlugin - // The time used for each applyLocal - public static final Timer getApplyLocalUpdateTimer = sentryMetrics.getTimer( - MetricRegistry.name(MetastorePlugin.class, "apply-local-update")); - // The size of path changes for each applyLocal - public static final Histogram getApplyLocalUpdateHistogram = sentryMetrics.getHistogram( - MetricRegistry.name(MetastorePlugin.class, "apply-local-update", - "path-change-size")); - private SentryHdfsMetricsUtil() { // Make constructor private to avoid instantiation } http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java index 029f9d5..0bd0833 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java @@ -46,22 +46,22 @@ import org.slf4j.LoggerFactory; import static org.apache.sentry.hdfs.Updateable.Update; /** - * SentryPlugin facilitates HDFS synchronization between HMS and NameNode. + * SentryPlugin listens to all sentry permission update events, persists permission + * changes into database. It also facilitates HDFS synchronization between HMS and NameNode. * <p> - * Normally, synchronization happens via partial (incremental) updates: + * Synchronization happens via a complete snapshot or partial (incremental) updates. + * Normally, it is the latter: * <ol> * <li> - * Whenever updates happen on HMS, they are immediately pushed to Sentry. - * Commonly, it's a single update per remote call. + * Whenever updates happen on HMS, a corresponding notification log is generated, + * and {@link HMSFollower} will process the notification event and persist it in database. * <li> * The NameNode periodically asks Sentry for updates. Sentry may return zero - * or more updates previously received from HMS. + * or more updates previously received via HMS notification log. * </ol> * <p> - * Each individual update is assigned a corresponding sequence number. Those - * numbers serve to detect the out-of-sync situations between HMS and Sentry and - * between Sentry and NameNode. Detecting out-of-sync situation triggers full - * update between the components that are out-of-sync. + * Each individual update is assigned a corresponding sequence number to synchronize + * updates between Sentry and NameNode. * <p> * SentryPlugin also implements signal-triggered mechanism of full path * updates from HMS to Sentry and from Sentry to NameNode, to address @@ -69,39 +69,18 @@ import static org.apache.sentry.hdfs.Updateable.Update; * Those out-of-sync situations may not be detectable via the exsiting sequence * numbers mechanism (most likely due to the implementation bugs). * <p> - * To facilitate signal-triggered full update from HMS to Sentry and from Sentry - * to the NameNode, the following 3 boolean variables are defined: - * fullUpdateHMS, fullUpdateHMSWait, and fullUpdateNN. - * <ol> - * <li> - * The purpose of fullUpdateHMS is to ensure that Sentry asks HMS for full - * update, and does so only once per signal. - * <li> - * The purpose of fullUpdateNN is to ensure that Sentry sends full update - * to NameNode, and does so only once per signal. - * <li> - * The purpose of fullUpdateHMSWait is to ensure that NN update only happens - * after HMS update. + * To facilitate signal-triggered full update from Sentry to NameNode, + * the boolean variables 'fullUpdateNN' is used to ensure that Sentry sends full + * update to NameNode, and does so only once per signal. * </ol> * The details: * <ol> * <li> - * Upon receiving a signal, fullUpdateHMS, fullUpdateHMSWait, and fullUpdateNN - * are all set to true. - * <li> - * On the next call to getLastSeenHMSPathSeqNum() from HMS, Sentry checks if - * fullUpdateHMS == true. If yes, it returns invalid (zero) sequence number - * to HMS, so HMS would push full update by calling handlePathUpdateNotification() - * next time. fullUpdateHMS is immediately reset to false, to only trigger one - * full update request to HMS per signal. - * <li> - * When HMS calls handlePathUpdateNotification(), Sentry checks if the update - * is a full image. If it is, fullUpdateHMSWait is set to false. + * Upon receiving a signal, fullUpdateNN is set to true. * <li> * When NameNode calls getAllPathsUpdatesFrom() asking for partial update, - * Sentry checks if both fullUpdateNN == true and fullUpdateHMSWait == false. - * If yes, it sends full update back to NameNode and immediately resets - * fullUpdateNN to false. + * Sentry checks if both fullUpdateNN == true. If yes, it sends full update back + * to NameNode and immediately resets fullUpdateNN to false. * </ol> */ @@ -109,18 +88,12 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class); - private final AtomicBoolean fullUpdateHMSWait = new AtomicBoolean(false); - private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false); private final AtomicBoolean fullUpdateNN = new AtomicBoolean(false); - public static volatile SentryPlugin instance; - private UpdateForwarder<PathsUpdate> pathsUpdater; - private UpdateForwarder<PermissionsUpdate> permsUpdater; - // TODO: Each perm change sequence number should be generated during persistence at sentry store. - private final AtomicLong permSeqNum = new AtomicLong(5); - private PermImageRetriever permImageRetriever; - private boolean outOfSync = false; + private DBUpdateForwarder<PathsUpdate> pathsUpdater; + private DBUpdateForwarder<PermissionsUpdate> permsUpdater; + /* * This number is smaller than starting sequence numbers used by NN and HMS * so in both cases its effect is to create appearance of out-of-sync @@ -130,33 +103,15 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen */ private static final long NO_LAST_SEEN_HMS_PATH_SEQ_NUM = 0L; - /* - * Call from HMS to get the last known update sequence #. - */ - long getLastSeenHMSPathSeqNum() { - if (!fullUpdateHMS.getAndSet(false)) { - return pathsUpdater.getLastSeen(); - } else { - LOGGER.info("SIGNAL HANDLING: asking for full update from HMS"); - return NO_LAST_SEEN_HMS_PATH_SEQ_NUM; - } - } - @Override public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException { - final String[] pathPrefixes = conf - .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES, - ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT); - final int initUpdateRetryDelayMs = - conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, - ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT); - permImageRetriever = new PermImageRetriever(sentryStore); - - pathsUpdater = UpdateForwarder.create(conf, new UpdateableAuthzPaths( - pathPrefixes), new PathsUpdate(0, false), null, 100, initUpdateRetryDelayMs, false); - permsUpdater = UpdateForwarder.create(conf, - new UpdateablePermissions(permImageRetriever), new PermissionsUpdate(0, false), - permImageRetriever, 100, initUpdateRetryDelayMs, true); + PermImageRetriever permImageRetriever = new PermImageRetriever(sentryStore); + PathImageRetriever pathImageRetriever = new PathImageRetriever(sentryStore); + PermDeltaRetriever permDeltaRetriever = new PermDeltaRetriever(sentryStore); + PathDeltaRetriever pathDeltaRetriever = new PathDeltaRetriever(sentryStore); + pathsUpdater = new DBUpdateForwarder<>(pathImageRetriever, pathDeltaRetriever); + permsUpdater = new DBUpdateForwarder<>(permImageRetriever, permDeltaRetriever); + LOGGER.info("Sentry HDFS plugin initialized !!"); instance = this; @@ -182,7 +137,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen if (!fullUpdateNN.get()) { // Most common case - Sentry is NOT handling a full update. return pathsUpdater.getAllUpdatesFrom(pathSeqNum); - } else if (!fullUpdateHMSWait.get()) { + } else { /* * Sentry is in the middle of signal-triggered full update. * It already got a full update from HMS @@ -216,10 +171,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen LOGGER.warn("SIGNAL HANDLING: returned NULL instead of full update to NameNode (???)"); } return updates; - } else { - // Sentry is handling a full update, but not yet received full update from HMS - LOGGER.warn("SIGNAL HANDLING: sending partial update to NameNode: still waiting for full update from HMS"); - return pathsUpdater.getAllUpdatesFrom(pathSeqNum); } } @@ -227,32 +178,17 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen return permsUpdater.getAllUpdatesFrom(permSeqNum); } - /* - * Handle partial (most common) or full update from HMS - */ - public void handlePathUpdateNotification(PathsUpdate update) - throws SentryPluginException { - pathsUpdater.handleUpdateNotification(update); - if (!update.hasFullImage()) { // most common case of partial update - LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "].."); - } else { // rare case of full update - LOGGER.warn("Recieved Authz Path FULL update [" + update.getSeqNum() + "].."); - // indicate that we're ready to send full update to NameNode - fullUpdateHMSWait.set(false); - } - } - @Override public Update onAlterSentryRoleAddGroups( TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException { - PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + PermissionsUpdate update = new PermissionsUpdate(); TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName()); for (TSentryGroup group : request.getGroups()) { rUpdate.addToAddGroups(group.getGroupName()); } - permsUpdater.handleUpdateNotification(update); - LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "].."); + LOGGER.debug(String.format("onAlterSentryRoleAddGroups, Authz Perm preUpdate[ %s ]", + request.getRoleName())); return update; } @@ -260,14 +196,14 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen public Update onAlterSentryRoleDeleteGroups( TAlterSentryRoleDeleteGroupsRequest request) throws SentryPluginException { - PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + PermissionsUpdate update = new PermissionsUpdate(); TRoleChanges rUpdate = update.addRoleUpdate(request.getRoleName()); for (TSentryGroup group : request.getGroups()) { rUpdate.addToDelGroups(group.getGroupName()); } - permsUpdater.handleUpdateNotification(update); - LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "].."); + LOGGER.debug(String.format("onAlterSentryRoleDeleteGroups, Authz Perm preUpdate [ %s ]", + request.getRoleName())); return update; } @@ -296,12 +232,12 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen return null; } - PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + PermissionsUpdate update = new PermissionsUpdate(); update.addPrivilegeUpdate(authzObj).putToAddPrivileges( roleName, privilege.getAction().toUpperCase()); - permsUpdater.handleUpdateNotification(update); - LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + "].."); + LOGGER.debug(String.format("onAlterSentryRoleGrantPrivilegeCore, Authz Perm preUpdate [ %s ]", + authzObj)); return update; } @@ -310,13 +246,13 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen throws SentryPluginException { String oldAuthz = HMSFollower.getAuthzObj(request.getOldAuthorizable()); String newAuthz = HMSFollower.getAuthzObj(request.getNewAuthorizable()); - PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + PermissionsUpdate update = new PermissionsUpdate(); TPrivilegeChanges privUpdate = update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS); privUpdate.putToAddPrivileges(newAuthz, newAuthz); privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); - permsUpdater.handleUpdateNotification(update); - LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + newAuthz + ", " + oldAuthz + "].."); + LOGGER.debug(String.format("onRenameSentryPrivilege, Authz Perm preUpdate [ %s ]", + oldAuthz)); return update; } @@ -339,14 +275,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen } } - public boolean isOutOfSync() { - return outOfSync; - } - - public void setOutOfSync(boolean outOfSync) { - this.outOfSync = outOfSync; - } - private PermissionsUpdate onAlterSentryRoleRevokePrivilegeCore(String roleName, TSentryPrivilege privilege) throws SentryPluginException { String authzObj = getAuthzObj(privilege); @@ -354,46 +282,44 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen return null; } - PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + PermissionsUpdate update = new PermissionsUpdate(); update.addPrivilegeUpdate(authzObj).putToDelPrivileges( roleName, privilege.getAction().toUpperCase()); - permsUpdater.handleUpdateNotification(update); - LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "].."); + LOGGER.debug(String.format("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ %s ]", + authzObj)); return update; } @Override public Update onDropSentryRole(TDropSentryRoleRequest request) throws SentryPluginException { - PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + PermissionsUpdate update = new PermissionsUpdate(); update.addPrivilegeUpdate(PermissionsUpdate.ALL_AUTHZ_OBJ).putToDelPrivileges( request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ); update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS); - permsUpdater.handleUpdateNotification(update); - LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "].."); + LOGGER.debug(String.format("onDropSentryRole, Authz Perm preUpdate [ %s ]", + request.getRoleName())); return update; } @Override public Update onDropSentryPrivilege(TDropPrivilegesRequest request) throws SentryPluginException { - PermissionsUpdate update = new PermissionsUpdate(permSeqNum.incrementAndGet(), false); + PermissionsUpdate update = new PermissionsUpdate(); String authzObj = HMSFollower.getAuthzObj(request.getAuthorizable()); update.addPrivilegeUpdate(authzObj).putToDelPrivileges( PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES); - permsUpdater.handleUpdateNotification(update); - LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "].."); + LOGGER.debug(String.format("onDropSentryPrivilege, Authz Perm preUpdate [ %s ]", + authzObj)); return update; } @Override public void onSignal(final String sigName) { LOGGER.info("SIGNAL HANDLING: Received signal " + sigName + ", triggering full update"); - fullUpdateHMS.set(true); - fullUpdateHMSWait.set(true); fullUpdateNN.set(true); } http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java deleted file mode 100644 index 22c5769..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.hdfs; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class UpdateForwarder<K extends Updateable.Update> implements - Updateable<K>, Closeable { - - private final AtomicLong lastSeenSeqNum = new AtomicLong(0); - protected final AtomicLong lastCommittedSeqNum = new AtomicLong(0); - // Updates should be handled in order - private final Executor updateHandler = Executors.newSingleThreadExecutor(); - - // Update log is used when propagate updates to a downstream cache. - // The preUpdate log stores all commits that were applied to this cache. - // When the update log is filled to capacity (getMaxUpdateLogSize()), all - // entries are cleared and a compact image if the state of the cache is - // appended to the log. - // The first entry in an update log (consequently the first preUpdate a - // downstream cache sees) will be a full image. All subsequent entries are - // partial edits - protected final LinkedList<K> updateLog = new LinkedList<K>(); - // UpdateLog is disabled when getMaxUpdateLogSize() = 0; - private final int maxUpdateLogSize; - - private final ImageRetriever<K> imageRetreiver; - - private volatile Updateable<K> updateable; - - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - protected static final long INIT_SEQ_NUM = -2; - protected static final int INIT_UPDATE_RETRY_DELAY = 5000; - - private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class); - private static final String UPDATABLE_TYPE_NAME = "update_forwarder"; - - public UpdateForwarder(Configuration conf, Updateable<K> updateable, - ImageRetriever<K> imageRetreiver, int maxUpdateLogSize, boolean shouldInit) { - this(conf, updateable, imageRetreiver, maxUpdateLogSize, INIT_UPDATE_RETRY_DELAY, shouldInit); - } - - protected UpdateForwarder(Configuration conf, Updateable<K> updateable, //NOPMD - ImageRetriever<K> imageRetreiver, int maxUpdateLogSize, - int initUpdateRetryDelay, boolean shouldInit) { - this.maxUpdateLogSize = maxUpdateLogSize; - this.imageRetreiver = imageRetreiver; - if (shouldInit) { - spawnInitialUpdater(updateable, initUpdateRetryDelay); - } else { - this.updateable = updateable; - } - } - - public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf, - Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver, - int maxUpdateLogSize, boolean shouldInit) throws SentryPluginException { - return create(conf, updateable, update, imageRetreiver, maxUpdateLogSize, - INIT_UPDATE_RETRY_DELAY, shouldInit); - } - - public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf, - Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver, - int maxUpdateLogSize, int initUpdateRetryDelay, boolean shouldInit) throws SentryPluginException { - return new UpdateForwarder<K>(conf, updateable, imageRetreiver, - maxUpdateLogSize, initUpdateRetryDelay, shouldInit); - } - - private void spawnInitialUpdater(final Updateable<K> updateable, - final int initUpdateRetryDelay) { - K firstFullImage = null; - try { - firstFullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM); - } catch (Exception e) { - LOGGER.warn("InitialUpdater encountered exception !! ", e); - firstFullImage = null; - Thread initUpdater = new Thread() { - @Override - public void run() { - while (UpdateForwarder.this.updateable == null) { - try { - Thread.sleep(initUpdateRetryDelay); - } catch (InterruptedException e) { - LOGGER.warn("Thread interrupted !! ", e); - break; - } - K fullImage = null; - try { - fullImage = - UpdateForwarder.this.imageRetreiver - .retrieveFullImage(INIT_SEQ_NUM); - appendToUpdateLog(fullImage); - } catch (Exception e) { - LOGGER.warn("InitialUpdater encountered exception !! ", e); - } - if (fullImage != null) { - UpdateForwarder.this.updateable = updateable.updateFull(fullImage); - } - } - } - }; - initUpdater.start(); - } - if (firstFullImage != null) { - try { - appendToUpdateLog(firstFullImage); - } catch (Exception e) { - LOGGER.warn("failed to update append log: ", e); - } - this.updateable = updateable.updateFull(firstFullImage); - } - } - /** - * Handle notifications from HMS plug-in or upstream Cache - * @param update - */ - public void handleUpdateNotification(final K update) throws SentryPluginException { - // Correct the seqNums on the first update - if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) { - K firstUpdate = getUpdateLog().peek(); - long firstSeqNum = update.getSeqNum() - 1; - if (firstUpdate != null) { - firstUpdate.setSeqNum(firstSeqNum); - } - lastCommittedSeqNum.set(firstSeqNum); - lastSeenSeqNum.set(firstSeqNum); - } - final boolean editNotMissed = - lastSeenSeqNum.incrementAndGet() == update.getSeqNum(); - if (!editNotMissed) { - lastSeenSeqNum.set(update.getSeqNum()); - } - Runnable task = new Runnable() { - @Override - public void run() { - K toUpdate = update; - if (update.hasFullImage()) { - updateable = updateable.updateFull(update); - } else { - if (editNotMissed) { - // apply partial preUpdate - updateable.updatePartial(Collections.singletonList(update), lock); - } else { - // Retrieve full update from External Source and - if (imageRetreiver != null) { - try { - toUpdate = imageRetreiver - .retrieveFullImage(update.getSeqNum()); - } catch (Exception e) { - LOGGER.warn("failed to retrieve full image: ", e); - } - updateable = updateable.updateFull(toUpdate); - } - } - } - try { - appendToUpdateLog(toUpdate); - } catch (Exception e) { - LOGGER.warn("failed to append to update log", e); - } - } - }; - updateHandler.execute(task); - } - - protected void appendToUpdateLog(K update) throws Exception { - synchronized (getUpdateLog()) { - boolean logCompacted = false; - if (getMaxUpdateLogSize() > 0) { - if (update.hasFullImage() || getUpdateLog().size() == getMaxUpdateLogSize()) { - // Essentially a log compaction - getUpdateLog().clear(); - getUpdateLog().add(update.hasFullImage() ? update - : createFullImageUpdate(update.getSeqNum())); - logCompacted = true; - } else { - getUpdateLog().add(update); - } - } - lastCommittedSeqNum.set(update.getSeqNum()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("#### Appending to Update Log [" - + "type=" + update.getClass() + ", " - + "lastCommit=" + lastCommittedSeqNum.get() + ", " - + "lastSeen=" + lastSeenSeqNum.get() + ", " - + "logCompacted=" + logCompacted + "]"); - } - } - } - - /** - * Return all updates from requested seqNum (inclusive) - * @param seqNum - * @return - */ - public List<K> getAllUpdatesFrom(long seqNum) throws Exception { - List<K> retVal = new LinkedList<K>(); - synchronized (getUpdateLog()) { - long currSeqNum = lastCommittedSeqNum.get(); - if (LOGGER.isDebugEnabled() && updateable != null) { - LOGGER.debug("#### GetAllUpdatesFrom [" - + "type=" + updateable.getClass() + ", " - + "reqSeqNum=" + seqNum + ", " - + "lastCommit=" + lastCommittedSeqNum.get() + ", " - + "lastSeen=" + lastSeenSeqNum.get() + ", " - + "getMaxUpdateLogSize()=" + getUpdateLog().size() + "]"); - } - if (getMaxUpdateLogSize() == 0) { - // no updatelog configured.. - return retVal; - } - K head = getUpdateLog().peek(); - if (head == null) { - return retVal; - } - if (seqNum > currSeqNum + 1) { - // This process has probably restarted since downstream - // recieved last update - retVal.addAll(getUpdateLog()); - return retVal; - } - if (head.getSeqNum() > seqNum) { - // Caller has diverged greatly.. - if (head.hasFullImage()) { - // head is a refresh(full) image - // Send full image along with partial updates - for (K u : getUpdateLog()) { - retVal.add(u); - } - } else { - // Create a full image - // clear updateLog - // add fullImage to head of Log - // NOTE : This should ideally never happen - K fullImage = createFullImageUpdate(currSeqNum); - getUpdateLog().clear(); - getUpdateLog().add(fullImage); - retVal.add(fullImage); - } - } else { - // increment iterator to requested seqNum - Iterator<K> iter = getUpdateLog().iterator(); - while (iter.hasNext()) { - K elem = iter.next(); - if (elem.getSeqNum() >= seqNum) { - retVal.add(elem); - } - } - } - } - return retVal; - } - - public boolean areAllUpdatesCommited() { - return lastCommittedSeqNum.get() == lastSeenSeqNum.get(); - } - - public long getLastCommitted() { - return lastCommittedSeqNum.get(); - } - - public long getLastSeen() { - return lastSeenSeqNum.get(); - } - - @Override - public Updateable<K> updateFull(K update) { - return (updateable != null) ? updateable.updateFull(update) : null; - } - - @Override - public void updatePartial(Iterable<K> updates, ReadWriteLock lock) { - if (updateable != null) { - updateable.updatePartial(updates, lock); - } - } - - @Override - public long getLastUpdatedSeqNum() { - return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM; - } - - @Override - public K createFullImageUpdate(long currSeqNum) throws Exception { - return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null; - } - - @Override - public String getUpdateableTypeName() { - // TODO Auto-generated method stub - return UPDATABLE_TYPE_NAME; - } - - protected LinkedList<K> getUpdateLog() { - return updateLog; - } - - protected int getMaxUpdateLogSize() { - return maxUpdateLogSize; - } - - @Override - public void close() throws IOException { - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/2811311e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java deleted file mode 100644 index 03c67d6..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.hdfs; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; - -public class UpdateablePermissions implements Updateable<PermissionsUpdate>{ - private static final String UPDATABLE_TYPE_NAME = "perm_update"; - - private AtomicLong seqNum = new AtomicLong(); - private final ImageRetriever<PermissionsUpdate> imageRetreiver; - - public UpdateablePermissions( - ImageRetriever<PermissionsUpdate> imageRetreiver) { - this.imageRetreiver = imageRetreiver; - } - - @Override - public PermissionsUpdate createFullImageUpdate(long currSeqNum) throws Exception { - return imageRetreiver.retrieveFullImage(currSeqNum); - } - - @Override - public long getLastUpdatedSeqNum() { - return seqNum.get(); - } - - @Override - public void updatePartial(Iterable<PermissionsUpdate> update, - ReadWriteLock lock) { - for (PermissionsUpdate permsUpdate : update) { - seqNum.set(permsUpdate.getSeqNum()); - } - } - - @Override - public Updateable<PermissionsUpdate> updateFull(PermissionsUpdate update) { - UpdateablePermissions other = new UpdateablePermissions(imageRetreiver); - other.seqNum.set(update.getSeqNum()); - return other; - } - - @Override - public String getUpdateableTypeName() { - return UPDATABLE_TYPE_NAME; - } -}
