Repository: sentry Updated Branches: refs/heads/master efe2e01b0 -> fab09a73b
SENTRY-1919: Sentry should prevent two snapshots from being sent to HDFS (Sergio Pena, reviewed by Na Li, Alexander Kolbasov, Vamsee Yarlagadda) Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/fab09a73 Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/fab09a73 Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/fab09a73 Branch: refs/heads/master Commit: fab09a73b62ffb50e6a88966da62aa309190b7fd Parents: efe2e01 Author: Sergio Pena <[email protected]> Authored: Wed Sep 6 09:52:10 2017 -0500 Committer: Sergio Pena <[email protected]> Committed: Wed Sep 6 09:52:10 2017 -0500 ---------------------------------------------------------------------- .../sentry/hdfs/SentryHDFSServiceProcessor.java | 117 ++++++++++++------- 1 file changed, 78 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/fab09a73/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java index 6221f3d..2866ab3 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import com.codahale.metrics.Timer.Context; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.sentry.hdfs.service.thrift.SentryHDFSService; import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateRequest; import org.apache.sentry.hdfs.service.thrift.TAuthzUpdateResponse; @@ -40,6 +41,10 @@ import org.slf4j.LoggerFactory; public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { private static final Logger LOGGER = LoggerFactory.getLogger(SentryHDFSServiceProcessor.class); + // Flag to prevent that multiple paths updates are sent to HDFS NameNodes at the same time. + // This helps to reduce memory consumption on large path images. + private static final AtomicBoolean pathsRetrieverBusy = new AtomicBoolean(false); + @Override public TAuthzUpdateResponse get_all_authz_updates_from(long permSeqNum, long pathSeqNum) throws TException { throw new UnsupportedOperationException( @@ -58,54 +63,88 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { return retVal; } - try (Context timerContext = - SentryHdfsMetricsUtil.getAllAuthzUpdatesTimer.time()){ - List<PermissionsUpdate> permUpdates = - SentryPlugin.instance.getAllPermsUpdatesFrom(request.getPermSeqNum()); - SentryHdfsMetricsUtil.getPermUpdateHistogram.update(permUpdates.size()); + try (Context timerContext = SentryHdfsMetricsUtil.getAllAuthzUpdatesTimer.time()) { + retVal.setAuthzPermUpdate(getPermissionsUpdatesFrom(request)); + retVal.setAuthzPathUpdate(getPathsUpdatesFrom(request)); + } catch (Exception e) { + LOGGER.error("Error Sending updates to downstream Cache", e); + throw new TException(e); + } + + return retVal; + } + + /** + * Retrieves and returns new requested permissions updates. + * + * @param request The requested thrift object that contains the permission seqNum + * @return A list of delta updates or a full image retrieved. It returns an empty list if there + * are not updates. + * @throws Exception If an error occurred while retrieving the updates. + */ + private List<TPermissionsUpdate> getPermissionsUpdatesFrom(TAuthzUpdateRequest request) + throws Exception { + + LOGGER.debug("PERMISSIONS updates requested from HDFS [SeqNum={}]", + request.getPermSeqNum()); + + List<PermissionsUpdate> permUpdates = + SentryPlugin.instance.getAllPermsUpdatesFrom(request.getPermSeqNum()); + + List<TPermissionsUpdate> retPermUpdates = new ArrayList<>(permUpdates.size()); + for (PermissionsUpdate update : permUpdates) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sending PERM preUpdate seq [{}], [{}]", + update.getSeqNum(), update.toThrift()); + } + + retPermUpdates.add(update.toThrift()); + } + + SentryHdfsMetricsUtil.getPermUpdateHistogram.update(permUpdates.size()); + return retPermUpdates; + } + + /** + * Retrieves and returns new requested paths updates. To avoid too much memory consumption + * with large full images, then this method will serialize the access to the requested updates + * to only one request at a time. + * + * @param request The requested thrift object that contains the paths seqNum and imgNum + * @return A list of delta updates or a full image retrieved. It returns an empty list if there + * are not updates or another request is happening at the same time. + * @throws Exception If an error occurred while retrieving the updates. + */ + private List<TPathsUpdate> getPathsUpdatesFrom(TAuthzUpdateRequest request) throws Exception { + LOGGER.debug("PATHS updates requested from HDFS [ImgNum={}, SeqNum={}]", + request.getPathSeqNum(), request.getPathImgNum()); + + // If another paths updates request is in progress by another HDFS NameNode, then we + // return an empty image for now to avoid a large memory consumption + if (pathsRetrieverBusy.compareAndSet(false, true)) { + LOGGER.debug("PATHS updates are not available because another request is in progress."); + return Collections.emptyList(); + } + + try { List<PathsUpdate> pathUpdates = - SentryPlugin.instance.getAllPathsUpdatesFrom(request.getPathSeqNum(), request.getPathImgNum()); - SentryHdfsMetricsUtil.getPathUpdateHistogram.update(pathUpdates.size()); + SentryPlugin.instance. + getAllPathsUpdatesFrom(request.getPathSeqNum(), request.getPathImgNum()); List<TPathsUpdate> retPathUpdates = new ArrayList<>(pathUpdates.size()); for (PathsUpdate update : pathUpdates) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Sending PATH preUpdate seq [{}], [{}]", - update.getSeqNum(), update.getImgNum()); - } + LOGGER.debug("Sending PATH preUpdate seq [{}], [{}]", + update.getSeqNum(), update.getImgNum()); retPathUpdates.add(update.toThrift()); } - retVal.setAuthzPathUpdate(retPathUpdates); - - List<TPermissionsUpdate>retPermUpdates = new ArrayList<>(permUpdates.size()); - for (PermissionsUpdate update : permUpdates) { - LOGGER.debug("Sending PERM preUpdate seq [{}], [{}]", - update.getSeqNum(), update.toThrift()); - retPermUpdates.add(update.toThrift()); - } - retVal.setAuthzPermUpdate(retPermUpdates); - if (LOGGER.isDebugEnabled()) { - StringBuilder permSeq = new StringBuilder("<"); - for (PermissionsUpdate permUpdate : permUpdates) { - permSeq.append(permUpdate.getSeqNum()).append(","); - } - permSeq.append(">"); - StringBuilder pathSeq = new StringBuilder("<"); - for (PathsUpdate pathUpdate : pathUpdates) { - pathSeq.append(pathUpdate.getSeqNum()).append(","); - pathSeq.append(pathUpdate.getImgNum()).append(","); - } - pathSeq.append(">"); - LOGGER.debug("Updates requested from HDFS [permReq={}, permResp={}] [pathReq={}, pathResp={}]", - new Object[]{request.getPermSeqNum(), permSeq, request.getPathSeqNum(), pathSeq}); - } + SentryHdfsMetricsUtil.getPathUpdateHistogram.update(pathUpdates.size()); + return retPathUpdates; } catch (Exception e) { - LOGGER.error("Error Sending updates to downstream Cache", e); - throw new TException(e); + throw e; + } finally { + pathsRetrieverBusy.set(false); } - - return retVal; } @Override
