SENTRY-1613 Change-Id: I6ae45e29ab6d6987e07ef2fa54037732d47f4b8d
Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/18be1d5e Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/18be1d5e Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/18be1d5e Branch: refs/heads/sentry-ha-redesign-1 Commit: 18be1d5e8c0efcb3bbd6d21cd457f20f2075e301 Parents: 813d10e Author: hahao <[email protected]> Authored: Thu Jan 26 18:11:54 2017 -0800 Committer: hahao <[email protected]> Committed: Fri Jan 27 14:58:11 2017 -0800 ---------------------------------------------------------------------- .../org/apache/sentry/hdfs/ImageRetriever.java | 2 +- .../apache/sentry/hdfs/ThriftSerializer.java | 10 +- .../org/apache/sentry/hdfs/UpdateRetriever.java | 38 ++ .../apache/sentry/hdfs/DBUpdateForwarder.java | 67 ++++ .../apache/sentry/hdfs/PathImageRetriever.java | 15 +- .../apache/sentry/hdfs/PathUpdateRetriever.java | 58 +++ .../apache/sentry/hdfs/PermImageRetriever.java | 6 +- .../apache/sentry/hdfs/PermUpdateRetriever.java | 58 +++ .../sentry/hdfs/SentryHDFSServiceProcessor.java | 29 +- .../org/apache/sentry/hdfs/SentryPlugin.java | 77 +--- .../org/apache/sentry/hdfs/UpdateForwarder.java | 335 ----------------- .../sentry/hdfs/UpdateablePermissions.java | 63 ---- .../apache/sentry/hdfs/TestUpdateForwarder.java | 359 ------------------- .../db/service/persistent/SentryStore.java | 102 ++++++ .../sentry/service/thrift/SentryService.java | 3 +- .../tests/e2e/hdfs/TestHDFSIntegration.java | 6 +- 16 files changed, 362 insertions(+), 866 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java index 1147c07..5080559 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ImageRetriever.java @@ -29,6 +29,6 @@ public interface ImageRetriever<K> { * @return a full snapshot of type K * @throws Exception */ - K retrieveFullImage(long seqNum) throws Exception; + K retrieveFullImage() throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java index 69aa098..d7b9923 100644 --- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ThriftSerializer.java @@ -25,12 +25,12 @@ import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TSimpleJSONProtocol; +import org.apache.thrift.protocol.TJSONProtocol; public class ThriftSerializer { - final static private TSimpleJSONProtocol.Factory tSimpleJSONProtocol = - new TSimpleJSONProtocol.Factory(); + final static private TJSONProtocol.Factory tJSONProtocol = + new TJSONProtocol.Factory(); // Use default max thrift message size here. // TODO: Figure out a way to make maxMessageSize configurable, eg. create a serializer singleton at startup by @@ -67,13 +67,13 @@ public class ThriftSerializer { public static String serializeToJSON(TBase base) throws TException { // Initiate a new TSerializer each time for thread safety. - TSerializer tSerializer = new TSerializer(tSimpleJSONProtocol); + TSerializer tSerializer = new TSerializer(tJSONProtocol); return tSerializer.toString(base); } public static void deserializeFromJSON(TBase base, String dataInJson) throws TException { // Initiate a new TDeserializer each time for thread safety. - TDeserializer tDeserializer = new TDeserializer(tSimpleJSONProtocol); + TDeserializer tDeserializer = new TDeserializer(tJSONProtocol); tDeserializer.fromString(base, dataInJson); } http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java new file mode 100644 index 0000000..ef25cfa --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/UpdateRetriever.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.sentry.hdfs; + +import java.util.List; + +/** + * Interface class for generating/retrieving a full image. + */ +public interface UpdateRetriever<K> { + + /** + * Retrieve a full image of type k. + * + * @param seqNum the given seq number + * @return a full snapshot of type K + * @throws Exception + */ + List<K> retrievePartialUpdate(long seqNum) throws Exception; + + boolean isPartialUpdateAvailable(long seqNum) throws Exception; +} http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java new file mode 100644 index 0000000..ca70a88 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/DBUpdateForwarder.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.sentry.provider.db.service.persistent.SentryStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DBUpdateForwarder is thread safe class + */ +public class DBUpdateForwarder<K extends Updateable.Update> { + + private final ImageRetriever<K> imageRetreiver; + private final UpdateRetriever<K> updateRetriever; + + private static final Logger LOGGER = LoggerFactory.getLogger(DBUpdateForwarder.class); + private static final String UPDATABLE_TYPE_NAME = "update_forwarder"; + + protected DBUpdateForwarder(final ImageRetriever<K> imageRetreiver, + final UpdateRetriever<K> updateRetriever) { + this.imageRetreiver = imageRetreiver; + this.updateRetriever = updateRetriever; + } + + /** + * Return all updates from requested seqNum (inclusive) + * + * @param seqNum + * @return the list of updates + */ + public List<K> getAllUpdatesFrom(long seqNum) throws Exception { + List<K> retVal = new LinkedList<>(); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("#### GetAllUpdatesFrom [" + + "reqSeqNum=" + seqNum + " ]"); + } + + if (seqNum == SentryStore.INIT_CHANGE_ID || + !updateRetriever.isPartialUpdateAvailable(seqNum)) { + retVal.add(imageRetreiver.retrieveFullImage()); + } else { + retVal.addAll(updateRetriever.retrievePartialUpdate(seqNum)); + } + + return retVal; + } +} http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java index 6cfd6d5..2edfcc1 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java @@ -18,13 +18,14 @@ package org.apache.sentry.hdfs; import com.codahale.metrics.Timer; +import com.google.common.collect.Lists; import org.apache.sentry.hdfs.service.thrift.TPathChanges; import org.apache.sentry.provider.db.service.persistent.PathsImage; import org.apache.sentry.provider.db.service.persistent.SentryStore; -import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class PathImageRetriever implements ImageRetriever<PathsUpdate> { @@ -35,7 +36,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> { } @Override - public PathsUpdate retrieveFullImage(long seqNum) throws Exception { + public PathsUpdate retrieveFullImage() throws Exception { try (final Timer.Context timerContext = SentryHdfsMetricsUtil.getRetrievePathFullImageTimer.time()) { @@ -48,7 +49,7 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> { // Generate a corresponding PathsUpdate. // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1567 - PathsUpdate pathsUpdate = new PathsUpdate(seqNum, true); + PathsUpdate pathsUpdate = new PathsUpdate(curSeqNum, true); for (Map.Entry<String, Set<String>> pathEnt : pathImage.entrySet()) { TPathChanges pathChange = pathsUpdate.newPathChange(pathEnt.getKey()); @@ -59,7 +60,13 @@ public class PathImageRetriever implements ImageRetriever<PathsUpdate> { SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate .getPathChanges().size()); + + UpdateableAuthzPaths authzPaths = new UpdateableAuthzPaths( + new String[]{"/"}); + authzPaths.updatePartial(Lists.newArrayList(pathsUpdate), + new ReentrantReadWriteLock()); + pathsUpdate.toThrift().setPathsDump(authzPaths.getPathsDump().createPathsDump()); return pathsUpdate; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java new file mode 100644 index 0000000..469ffe7 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathUpdateRetriever.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + + +import com.google.common.collect.Lists; +import org.apache.sentry.provider.db.service.model.MSentryPathChange; +import org.apache.sentry.provider.db.service.persistent.SentryStore; + +import java.util.List; + +public class PathUpdateRetriever implements UpdateRetriever<PathsUpdate> { + + private final SentryStore sentryStore; + + public PathUpdateRetriever(SentryStore sentryStore) { + this.sentryStore = sentryStore; + } + + @Override + public List<PathsUpdate> retrievePartialUpdate(long seqNum) throws Exception { + List<MSentryPathChange> mSentryPathChanges = + sentryStore.getMSentryPathChanges(seqNum); + List<PathsUpdate> updates = Lists.newArrayList(); + for (MSentryPathChange mSentryPathChange : mSentryPathChanges) { + // get changeID from stored MSentryPathChange + long changeID = mSentryPathChange.getChangeID(); + // Create a corresponding PathsUpdate and deserialize the stored + // JSON string to TPathsUpdate. Then set the corresponding + // changeID. + PathsUpdate pathsUpdate = new PathsUpdate(); + pathsUpdate.JSONDeserialize(mSentryPathChange.getPathChange()); + pathsUpdate.setSeqNum(changeID); + updates.add(pathsUpdate); + } + return updates; + } + + @Override + public boolean isPartialUpdateAvailable(long seqNum) throws Exception { + return sentryStore.findMSentryPathChangeByID(seqNum); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java index 56985c2..323d090 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermImageRetriever.java @@ -37,7 +37,7 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> { } @Override - public PermissionsUpdate retrieveFullImage(long seqNum) throws Exception { + public PermissionsUpdate retrieveFullImage() throws Exception { try(Timer.Context timerContext = SentryHdfsMetricsUtil.getRetrievePermFullImageTimer.time()) { @@ -72,7 +72,7 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> { PermissionsUpdate permissionsUpdate = new PermissionsUpdate(tPermUpdate); // TODO: use curSeqNum from DB instead of seqNum when doing SENTRY-1567 - permissionsUpdate.setSeqNum(seqNum); + permissionsUpdate.setSeqNum(curSeqNum); SentryHdfsMetricsUtil.getPrivilegeChangesHistogram.update( tPermUpdate.getPrivilegeChangesSize()); SentryHdfsMetricsUtil.getRoleChangesHistogram.update( @@ -81,4 +81,4 @@ public class PermImageRetriever implements ImageRetriever<PermissionsUpdate> { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java new file mode 100644 index 0000000..e7e0e77 --- /dev/null +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PermUpdateRetriever.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.hdfs; + + +import com.google.common.collect.Lists; +import org.apache.sentry.provider.db.service.model.MSentryPermChange; +import org.apache.sentry.provider.db.service.persistent.SentryStore; + +import java.util.List; + +public class PermUpdateRetriever implements UpdateRetriever<PermissionsUpdate> { + + private final SentryStore sentryStore; + + public PermUpdateRetriever(SentryStore sentryStore) { + this.sentryStore = sentryStore; + } + + @Override + public List<PermissionsUpdate> retrievePartialUpdate(long seqNum) throws Exception { + List<MSentryPermChange> mSentryPermChanges = + sentryStore.getMSentryPermChanges(seqNum); + List<PermissionsUpdate> updates = Lists.newArrayList(); + for (MSentryPermChange mSentryPermChange : mSentryPermChanges) { + // get changeID from stored MSentryPermChange + long changeID = mSentryPermChange.getChangeID(); + // Create a corresponding PermissionsUpdate and deserialize the stored + // JSON string to TPermissionsUpdate. Then set the corresponding + // changeID. + PermissionsUpdate permsUpdate = new PermissionsUpdate(); + permsUpdate.JSONDeserialize(mSentryPermChange.getPermChange()); + permsUpdate.setSeqNum(changeID); + updates.add(permsUpdate); + } + return updates; + } + + @Override + public boolean isPartialUpdateAvailable(long seqNum) throws Exception { + return sentryStore.findMSentryPermChangeByID(seqNum); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java index e4f3f58..5b146b7 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryHDFSServiceProcessor.java @@ -42,10 +42,6 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { retVal.setAuthzPathUpdate(new LinkedList<TPathsUpdate>()); retVal.setAuthzPermUpdate(new LinkedList<TPermissionsUpdate>()); if (SentryPlugin.instance != null) { - if (SentryPlugin.instance.isOutOfSync()) { - throw new TException( - "This Sentry server is not communicating with other nodes and out of sync "); - } final Timer.Context timerContext = SentryHdfsMetricsUtil.getAllAuthzUpdatesTimer.time(); try { @@ -99,33 +95,12 @@ public class SentryHDFSServiceProcessor implements SentryHDFSService.Iface { @Override public void handle_hms_notification(TPathsUpdate update) throws TException { - final Timer.Context timerContext = - SentryHdfsMetricsUtil.getHandleHmsNotificationTimer.time(); - try { - PathsUpdate hmsUpdate = new PathsUpdate(update); - if (SentryPlugin.instance != null) { - SentryPlugin.instance.handlePathUpdateNotification(hmsUpdate); - LOGGER.debug("Authz Paths update [" + hmsUpdate.getSeqNum() + "].."); - } else { - LOGGER.error("SentryPlugin not initialized yet !!"); - } - } catch (Exception e) { - LOGGER.error("Error handling notification from HMS", e); - SentryHdfsMetricsUtil.getFailedHandleHmsNotificationCounter.inc(); - throw new TException(e); - } finally { - timerContext.stop(); - SentryHdfsMetricsUtil.getHandleHmsPathChangeHistogram.update( - update.getPathChangesSize()); - if (update.isHasFullImage()) { - SentryHdfsMetricsUtil.getHandleHmsHasFullImageCounter.inc(); - } - } + throw new TException("handle_hms_notification is no longer supported"); } @Override public long check_hms_seq_num(long pathSeqNum) throws TException { - return SentryPlugin.instance.getLastSeenHMSPathSeqNum(); + throw new TException("check_hms_seq_num is no longer supported"); } /** http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java index b99013e..f5ee179 100644 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java +++ b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java @@ -108,18 +108,16 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen private static final Logger LOGGER = LoggerFactory.getLogger(SentryPlugin.class); - private final AtomicBoolean fullUpdateHMSWait = new AtomicBoolean(false); private final AtomicBoolean fullUpdateHMS = new AtomicBoolean(false); private final AtomicBoolean fullUpdateNN = new AtomicBoolean(false); public static volatile SentryPlugin instance; - private UpdateForwarder<PathsUpdate> pathsUpdater; - private UpdateForwarder<PermissionsUpdate> permsUpdater; + private DBUpdateForwarder<PathsUpdate> pathsUpdater; + private DBUpdateForwarder<PermissionsUpdate> permsUpdater; + // TODO: Each perm change sequence number should be generated during persistence at sentry store. private final AtomicLong permSeqNum = new AtomicLong(5); - private PermImageRetriever permImageRetriever; - private boolean outOfSync = false; /* * This number is smaller than starting sequence numbers used by NN and HMS * so in both cases its effect is to create appearance of out-of-sync @@ -129,33 +127,15 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen */ private static final long NO_LAST_SEEN_HMS_PATH_SEQ_NUM = 0L; - /* - * Call from HMS to get the last known update sequence #. - */ - long getLastSeenHMSPathSeqNum() { - if (!fullUpdateHMS.getAndSet(false)) { - return pathsUpdater.getLastSeen(); - } else { - LOGGER.info("SIGNAL HANDLING: asking for full update from HMS"); - return NO_LAST_SEEN_HMS_PATH_SEQ_NUM; - } - } - @Override public void initialize(Configuration conf, SentryStore sentryStore) throws SentryPluginException { - final String[] pathPrefixes = conf - .getStrings(ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES, - ServerConfig.SENTRY_HDFS_INTEGRATION_PATH_PREFIXES_DEFAULT); - final int initUpdateRetryDelayMs = - conf.getInt(ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_MS, - ServerConfig.SENTRY_HDFS_INIT_UPDATE_RETRY_DELAY_DEFAULT); - permImageRetriever = new PermImageRetriever(sentryStore); - - pathsUpdater = UpdateForwarder.create(conf, new UpdateableAuthzPaths( - pathPrefixes), new PathsUpdate(0, false), null, 100, initUpdateRetryDelayMs, false); - permsUpdater = UpdateForwarder.create(conf, - new UpdateablePermissions(permImageRetriever), new PermissionsUpdate(0, false), - permImageRetriever, 100, initUpdateRetryDelayMs, true); + PermImageRetriever permImageRetriever = new PermImageRetriever(sentryStore); + PathImageRetriever pathImageRetriever = new PathImageRetriever(sentryStore); + PermUpdateRetriever permUpdateRetriever = new PermUpdateRetriever(sentryStore); + PathUpdateRetriever pathUpdateRetriever = new PathUpdateRetriever(sentryStore); + pathsUpdater = new DBUpdateForwarder<>(pathImageRetriever, pathUpdateRetriever); + permsUpdater = new DBUpdateForwarder<>(permImageRetriever, permUpdateRetriever); + LOGGER.info("Sentry HDFS plugin initialized !!"); instance = this; @@ -181,7 +161,7 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen if (!fullUpdateNN.get()) { // Most common case - Sentry is NOT handling a full update. return pathsUpdater.getAllUpdatesFrom(pathSeqNum); - } else if (!fullUpdateHMSWait.get()) { + } else { /* * Sentry is in the middle of signal-triggered full update. * It already got a full update from HMS @@ -215,10 +195,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen LOGGER.warn("SIGNAL HANDLING: returned NULL instead of full update to NameNode (???)"); } return updates; - } else { - // Sentry is handling a full update, but not yet received full update from HMS - LOGGER.warn("SIGNAL HANDLING: sending partial update to NameNode: still waiting for full update from HMS"); - return pathsUpdater.getAllUpdatesFrom(pathSeqNum); } } @@ -226,21 +202,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen return permsUpdater.getAllUpdatesFrom(permSeqNum); } - /* - * Handle partial (most common) or full update from HMS - */ - public void handlePathUpdateNotification(PathsUpdate update) - throws SentryPluginException { - pathsUpdater.handleUpdateNotification(update); - if (!update.hasFullImage()) { // most common case of partial update - LOGGER.debug("Recieved Authz Path update [" + update.getSeqNum() + "].."); - } else { // rare case of full update - LOGGER.warn("Recieved Authz Path FULL update [" + update.getSeqNum() + "].."); - // indicate that we're ready to send full update to NameNode - fullUpdateHMSWait.set(false); - } - } - @Override public DeltaTransactionBlock onAlterSentryRoleAddGroups( TAlterSentryRoleAddGroupsRequest request) throws SentryPluginException { @@ -250,7 +211,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen rUpdate.addToAddGroups(group.getGroupName()); } - permsUpdater.handleUpdateNotification(update); LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "].."); return new DeltaTransactionBlock(update); } @@ -265,7 +225,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen rUpdate.addToDelGroups(group.getGroupName()); } - permsUpdater.handleUpdateNotification(update); LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "].."); return new DeltaTransactionBlock(update); } @@ -301,7 +260,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen update.addPrivilegeUpdate(authzObj).putToAddPrivileges( roleName, privilege.getAction().toUpperCase()); - permsUpdater.handleUpdateNotification(update); LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + "].."); return update; } @@ -316,7 +274,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen privUpdate.putToAddPrivileges(newAuthz, newAuthz); privUpdate.putToDelPrivileges(oldAuthz, oldAuthz); - permsUpdater.handleUpdateNotification(update); LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + newAuthz + ", " + oldAuthz + "].."); return new DeltaTransactionBlock(update); } @@ -342,14 +299,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen } } - public boolean isOutOfSync() { - return outOfSync; - } - - public void setOutOfSync(boolean outOfSync) { - this.outOfSync = outOfSync; - } - private PermissionsUpdate onAlterSentryRoleRevokePrivilegeCore(String roleName, TSentryPrivilege privilege) throws SentryPluginException { String authzObj = getAuthzObj(privilege); @@ -361,7 +310,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen update.addPrivilegeUpdate(authzObj).putToDelPrivileges( roleName, privilege.getAction().toUpperCase()); - permsUpdater.handleUpdateNotification(update); LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "].."); return update; } @@ -374,7 +322,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ); update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS); - permsUpdater.handleUpdateNotification(update); LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + request.getRoleName() + "].."); return new DeltaTransactionBlock(update); } @@ -387,7 +334,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen update.addPrivilegeUpdate(authzObj).putToDelPrivileges( PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES); - permsUpdater.handleUpdateNotification(update); LOGGER.debug("Authz Perm preUpdate [" + update.getSeqNum() + ", " + authzObj + "].."); return new DeltaTransactionBlock(update); } @@ -396,7 +342,6 @@ public class SentryPlugin implements SentryPolicyStorePlugin, SigUtils.SigListen public void onSignal(final String sigName) { LOGGER.info("SIGNAL HANDLING: Received signal " + sigName + ", triggering full update"); fullUpdateHMS.set(true); - fullUpdateHMSWait.set(true); fullUpdateNN.set(true); } http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java deleted file mode 100644 index 4bfc473..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateForwarder.java +++ /dev/null @@ -1,335 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.hdfs; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.provider.db.SentryPolicyStorePlugin.SentryPluginException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class UpdateForwarder<K extends Updateable.Update> implements - Updateable<K>, Closeable { - - private final AtomicLong lastSeenSeqNum = new AtomicLong(0); - protected final AtomicLong lastCommittedSeqNum = new AtomicLong(0); - // Updates should be handled in order - private final Executor updateHandler = Executors.newSingleThreadExecutor(); - - // Update log is used when propagate updates to a downstream cache. - // The preUpdate log stores all commits that were applied to this cache. - // When the update log is filled to capacity (getMaxUpdateLogSize()), all - // entries are cleared and a compact image if the state of the cache is - // appended to the log. - // The first entry in an update log (consequently the first preUpdate a - // downstream cache sees) will be a full image. All subsequent entries are - // partial edits - protected final LinkedList<K> updateLog = new LinkedList<K>(); - // UpdateLog is disabled when getMaxUpdateLogSize() = 0; - private final int maxUpdateLogSize; - - private final ImageRetriever<K> imageRetreiver; - - private volatile Updateable<K> updateable; - - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - protected static final long INIT_SEQ_NUM = -2; - protected static final int INIT_UPDATE_RETRY_DELAY = 5000; - - private static final Logger LOGGER = LoggerFactory.getLogger(UpdateForwarder.class); - private static final String UPDATABLE_TYPE_NAME = "update_forwarder"; - - public UpdateForwarder(Configuration conf, Updateable<K> updateable, - ImageRetriever<K> imageRetreiver, int maxUpdateLogSize, boolean shouldInit) { - this(conf, updateable, imageRetreiver, maxUpdateLogSize, INIT_UPDATE_RETRY_DELAY, shouldInit); - } - - protected UpdateForwarder(Configuration conf, Updateable<K> updateable, //NOPMD - ImageRetriever<K> imageRetreiver, int maxUpdateLogSize, - int initUpdateRetryDelay, boolean shouldInit) { - this.maxUpdateLogSize = maxUpdateLogSize; - this.imageRetreiver = imageRetreiver; - if (shouldInit) { - spawnInitialUpdater(updateable, initUpdateRetryDelay); - } else { - this.updateable = updateable; - } - } - - public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf, - Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver, - int maxUpdateLogSize, boolean shouldInit) throws SentryPluginException { - return create(conf, updateable, update, imageRetreiver, maxUpdateLogSize, - INIT_UPDATE_RETRY_DELAY, shouldInit); - } - - public static <K extends Updateable.Update> UpdateForwarder<K> create(Configuration conf, - Updateable<K> updateable, K update, ImageRetriever<K> imageRetreiver, - int maxUpdateLogSize, int initUpdateRetryDelay, boolean shouldInit) throws SentryPluginException { - return new UpdateForwarder<K>(conf, updateable, imageRetreiver, - maxUpdateLogSize, initUpdateRetryDelay, shouldInit); - } - - private void spawnInitialUpdater(final Updateable<K> updateable, - final int initUpdateRetryDelay) { - K firstFullImage = null; - try { - firstFullImage = imageRetreiver.retrieveFullImage(INIT_SEQ_NUM); - } catch (Exception e) { - LOGGER.warn("InitialUpdater encountered exception !! ", e); - firstFullImage = null; - Thread initUpdater = new Thread() { - @Override - public void run() { - while (UpdateForwarder.this.updateable == null) { - try { - Thread.sleep(initUpdateRetryDelay); - } catch (InterruptedException e) { - LOGGER.warn("Thread interrupted !! ", e); - break; - } - K fullImage = null; - try { - fullImage = - UpdateForwarder.this.imageRetreiver - .retrieveFullImage(INIT_SEQ_NUM); - appendToUpdateLog(fullImage); - } catch (Exception e) { - LOGGER.warn("InitialUpdater encountered exception !! ", e); - } - if (fullImage != null) { - UpdateForwarder.this.updateable = updateable.updateFull(fullImage); - } - } - } - }; - initUpdater.start(); - } - if (firstFullImage != null) { - try { - appendToUpdateLog(firstFullImage); - } catch (Exception e) { - LOGGER.warn("failed to update append log: ", e); - } - this.updateable = updateable.updateFull(firstFullImage); - } - } - /** - * Handle notifications from HMS plug-in or upstream Cache - * @param update - */ - public void handleUpdateNotification(final K update) throws SentryPluginException { - // Correct the seqNums on the first update - if (lastCommittedSeqNum.get() == INIT_SEQ_NUM) { - K firstUpdate = getUpdateLog().peek(); - long firstSeqNum = update.getSeqNum() - 1; - if (firstUpdate != null) { - firstUpdate.setSeqNum(firstSeqNum); - } - lastCommittedSeqNum.set(firstSeqNum); - lastSeenSeqNum.set(firstSeqNum); - } - final boolean editNotMissed = - lastSeenSeqNum.incrementAndGet() == update.getSeqNum(); - if (!editNotMissed) { - lastSeenSeqNum.set(update.getSeqNum()); - } - Runnable task = new Runnable() { - @Override - public void run() { - K toUpdate = update; - if (update.hasFullImage()) { - updateable = updateable.updateFull(update); - } else { - if (editNotMissed) { - // apply partial preUpdate - updateable.updatePartial(Collections.singletonList(update), lock); - } else { - // Retrieve full update from External Source and - if (imageRetreiver != null) { - try { - toUpdate = imageRetreiver - .retrieveFullImage(update.getSeqNum()); - } catch (Exception e) { - LOGGER.warn("failed to retrieve full image: ", e); - } - updateable = updateable.updateFull(toUpdate); - } - } - } - try { - appendToUpdateLog(toUpdate); - } catch (Exception e) { - LOGGER.warn("failed to append to update log", e); - } - } - }; - updateHandler.execute(task); - } - - protected void appendToUpdateLog(K update) throws Exception { - synchronized (getUpdateLog()) { - boolean logCompacted = false; - if (getMaxUpdateLogSize() > 0) { - if (update.hasFullImage() || getUpdateLog().size() == getMaxUpdateLogSize()) { - // Essentially a log compaction - getUpdateLog().clear(); - getUpdateLog().add(update.hasFullImage() ? update - : createFullImageUpdate(update.getSeqNum())); - logCompacted = true; - } else { - getUpdateLog().add(update); - } - } - lastCommittedSeqNum.set(update.getSeqNum()); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("#### Appending to Update Log [" - + "type=" + update.getClass() + ", " - + "lastCommit=" + lastCommittedSeqNum.get() + ", " - + "lastSeen=" + lastSeenSeqNum.get() + ", " - + "logCompacted=" + logCompacted + "]"); - } - } - } - - /** - * Return all updates from requested seqNum (inclusive) - * @param seqNum - * @return - */ - public List<K> getAllUpdatesFrom(long seqNum) throws Exception { - List<K> retVal = new LinkedList<K>(); - synchronized (getUpdateLog()) { - long currSeqNum = lastCommittedSeqNum.get(); - if (LOGGER.isDebugEnabled() && updateable != null) { - LOGGER.debug("#### GetAllUpdatesFrom [" - + "type=" + updateable.getClass() + ", " - + "reqSeqNum=" + seqNum + ", " - + "lastCommit=" + lastCommittedSeqNum.get() + ", " - + "lastSeen=" + lastSeenSeqNum.get() + ", " - + "getMaxUpdateLogSize()=" + getUpdateLog().size() + "]"); - } - if (getMaxUpdateLogSize() == 0) { - // no updatelog configured.. - return retVal; - } - K head = getUpdateLog().peek(); - if (head == null) { - return retVal; - } - if (seqNum > currSeqNum + 1) { - // This process has probably restarted since downstream - // recieved last update - retVal.addAll(getUpdateLog()); - return retVal; - } - if (head.getSeqNum() > seqNum) { - // Caller has diverged greatly.. - if (head.hasFullImage()) { - // head is a refresh(full) image - // Send full image along with partial updates - for (K u : getUpdateLog()) { - retVal.add(u); - } - } else { - // Create a full image - // clear updateLog - // add fullImage to head of Log - // NOTE : This should ideally never happen - K fullImage = createFullImageUpdate(currSeqNum); - getUpdateLog().clear(); - getUpdateLog().add(fullImage); - retVal.add(fullImage); - } - } else { - // increment iterator to requested seqNum - Iterator<K> iter = getUpdateLog().iterator(); - while (iter.hasNext()) { - K elem = iter.next(); - if (elem.getSeqNum() >= seqNum) { - retVal.add(elem); - } - } - } - } - return retVal; - } - - public boolean areAllUpdatesCommited() { - return lastCommittedSeqNum.get() == lastSeenSeqNum.get(); - } - - public long getLastCommitted() { - return lastCommittedSeqNum.get(); - } - - public long getLastSeen() { - return lastSeenSeqNum.get(); - } - - @Override - public Updateable<K> updateFull(K update) { - return (updateable != null) ? updateable.updateFull(update) : null; - } - - @Override - public void updatePartial(Iterable<K> updates, ReadWriteLock lock) { - if (updateable != null) { - updateable.updatePartial(updates, lock); - } - } - - @Override - public long getLastUpdatedSeqNum() { - return (updateable != null) ? updateable.getLastUpdatedSeqNum() : INIT_SEQ_NUM; - } - - @Override - public K createFullImageUpdate(long currSeqNum) throws Exception { - return (updateable != null) ? updateable.createFullImageUpdate(currSeqNum) : null; - } - - @Override - public String getUpdateableTypeName() { - // TODO Auto-generated method stub - return UPDATABLE_TYPE_NAME; - } - - protected LinkedList<K> getUpdateLog() { - return updateLog; - } - - protected int getMaxUpdateLogSize() { - return maxUpdateLogSize; - } - - @Override - public void close() throws IOException { - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java deleted file mode 100644 index 03c67d6..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/UpdateablePermissions.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.hdfs; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReadWriteLock; - -public class UpdateablePermissions implements Updateable<PermissionsUpdate>{ - private static final String UPDATABLE_TYPE_NAME = "perm_update"; - - private AtomicLong seqNum = new AtomicLong(); - private final ImageRetriever<PermissionsUpdate> imageRetreiver; - - public UpdateablePermissions( - ImageRetriever<PermissionsUpdate> imageRetreiver) { - this.imageRetreiver = imageRetreiver; - } - - @Override - public PermissionsUpdate createFullImageUpdate(long currSeqNum) throws Exception { - return imageRetreiver.retrieveFullImage(currSeqNum); - } - - @Override - public long getLastUpdatedSeqNum() { - return seqNum.get(); - } - - @Override - public void updatePartial(Iterable<PermissionsUpdate> update, - ReadWriteLock lock) { - for (PermissionsUpdate permsUpdate : update) { - seqNum.set(permsUpdate.getSeqNum()); - } - } - - @Override - public Updateable<PermissionsUpdate> updateFull(PermissionsUpdate update) { - UpdateablePermissions other = new UpdateablePermissions(imageRetreiver); - other.seqNum.set(update.getSeqNum()); - return other; - } - - @Override - public String getUpdateableTypeName() { - return UPDATABLE_TYPE_NAME; - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java ---------------------------------------------------------------------- diff --git a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java b/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java deleted file mode 100644 index d12b134..0000000 --- a/sentry-hdfs/sentry-hdfs-service/src/test/java/org/apache/sentry/hdfs/TestUpdateForwarder.java +++ /dev/null @@ -1,359 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.sentry.hdfs; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.locks.ReadWriteLock; - -import org.apache.thrift.TException; -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.apache.sentry.hdfs.Updateable.Update; -import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig; -import org.junit.After; -import org.junit.Assume; -import org.junit.Test; - -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; - -public class TestUpdateForwarder { - - public static class DummyUpdate implements Update { - private long seqNum = 0; - private boolean hasFullUpdate = false; - private String state; - public DummyUpdate() { - this(0, false); - } - public DummyUpdate(long seqNum, boolean hasFullUpdate) { - this.seqNum = seqNum; - this.hasFullUpdate = hasFullUpdate; - } - public String getState() { - return state; - } - public DummyUpdate setState(String stuff) { - this.state = stuff; - return this; - } - @Override - public boolean hasFullImage() { - return hasFullUpdate; - } - @Override - public long getSeqNum() { - return seqNum; - } - @Override - public void setSeqNum(long seqNum) { - this.seqNum = seqNum; - } - @Override - public byte[] serialize() throws IOException { - return state.getBytes(); - } - - @Override - public void deserialize(byte[] data) throws IOException { - state = new String(data); - } - - @Override - public String JSONSerialize() throws TException { - return state; - } - - @Override - public void JSONDeserialize(String update) throws TException { - state = new String(update); - } - } - - static class DummyUpdatable implements Updateable<DummyUpdate> { - - private List<String> state = new LinkedList<String>(); - private long lastUpdatedSeqNum = 0; - - @Override - public void updatePartial(Iterable<DummyUpdate> update, ReadWriteLock lock) { - for (DummyUpdate u : update) { - state.add(u.getState()); - lastUpdatedSeqNum = u.seqNum; - } - } - - @Override - public Updateable<DummyUpdate> updateFull(DummyUpdate update) { - DummyUpdatable retVal = new DummyUpdatable(); - retVal.lastUpdatedSeqNum = update.seqNum; - retVal.state = Lists.newArrayList(update.state.split(",")); - return retVal; - } - - @Override - public long getLastUpdatedSeqNum() { - return lastUpdatedSeqNum; - } - - @Override - public DummyUpdate createFullImageUpdate(long currSeqNum) { - DummyUpdate retVal = new DummyUpdate(currSeqNum, true); - retVal.state = Joiner.on(",").join(state); - return retVal; - } - - public String getState() { - return Joiner.on(",").join(state); - } - - @Override - public String getUpdateableTypeName() { - // TODO Auto-generated method stub - return "DummyUpdator"; - } - } - - static class DummyImageRetreiver implements ImageRetriever<DummyUpdate> { - - private String state; - public void setState(String state) { - this.state = state; - } - @Override - public DummyUpdate retrieveFullImage(long currSeqNum) { - DummyUpdate retVal = new DummyUpdate(currSeqNum, true); - retVal.state = state; - return retVal; - } - } - - protected Configuration testConf = new Configuration(); - protected UpdateForwarder<DummyUpdate> updateForwarder; - - @After - public void cleanup() throws Exception { - if (updateForwarder != null) { - updateForwarder.close(); - updateForwarder = null; - } - } - - @Test - public void testInit() throws Exception { - DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); - imageRetreiver.setState("a,b,c"); - updateForwarder = UpdateForwarder.create( - testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 10, true); - Assert.assertEquals(-2, updateForwarder.getLastUpdatedSeqNum()); - List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); - Assert.assertTrue(allUpdates.size() == 1); - Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); - - // If the current process has restarted the input seqNum will be > currSeq - allUpdates = updateForwarder.getAllUpdatesFrom(100); - Assert.assertTrue(allUpdates.size() == 1); - Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); - Assert.assertEquals(-2, allUpdates.get(0).getSeqNum()); - allUpdates = updateForwarder.getAllUpdatesFrom(-1); - Assert.assertEquals(0, allUpdates.size()); - } - - @Test - public void testUpdateReceive() throws Exception { - DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); - imageRetreiver.setState("a,b,c"); - updateForwarder = UpdateForwarder.create( - testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true); - updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum()); - List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); - Assert.assertEquals(2, allUpdates.size()); - Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); - Assert.assertEquals("d", allUpdates.get(1).getState()); - } - - // This happens when we the first update from HMS is a -1 (If the heartbeat - // thread checks Sentry's current seqNum before any update has come in).. - // This will lead the first and second entries in the updatelog to differ - // by more than +1.. - @Test - public void testUpdateReceiveWithNullImageRetriver() throws Exception { - Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, - false)); - updateForwarder = UpdateForwarder.create( - testConf, new DummyUpdatable(), new DummyUpdate(), null, 5, false); - updateForwarder.handleUpdateNotification(new DummyUpdate(-1, true).setState("a")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(1); - Assert.assertEquals("a", allUpdates.get(0).getState()); - updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("b")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("c")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum()); - allUpdates = updateForwarder.getAllUpdatesFrom(0); - Assert.assertEquals(2, allUpdates.size()); - Assert.assertEquals("b", allUpdates.get(0).getState()); - Assert.assertEquals("c", allUpdates.get(1).getState()); - } - - @Test - public void testGetUpdates() throws Exception { - DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); - imageRetreiver.setState("a,b,c"); - updateForwarder = UpdateForwarder.create( - testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true); - updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum()); - List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); - Assert.assertEquals(2, allUpdates.size()); - - updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e")); - updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f")); - - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum()); - allUpdates = updateForwarder.getAllUpdatesFrom(0); - Assert.assertEquals(4, allUpdates.size()); - Assert.assertEquals("a,b,c", allUpdates.get(0).getState()); - Assert.assertEquals(4, allUpdates.get(0).getSeqNum()); - Assert.assertEquals("d", allUpdates.get(1).getState()); - Assert.assertEquals(5, allUpdates.get(1).getSeqNum()); - Assert.assertEquals("e", allUpdates.get(2).getState()); - Assert.assertEquals(6, allUpdates.get(2).getSeqNum()); - Assert.assertEquals("f", allUpdates.get(3).getState()); - Assert.assertEquals(7, allUpdates.get(3).getSeqNum()); - - updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum()); - allUpdates = updateForwarder.getAllUpdatesFrom(8); - Assert.assertEquals(1, allUpdates.size()); - Assert.assertEquals("g", allUpdates.get(0).getState()); - } - - @Test - public void testGetUpdatesAfterExternalEntityReset() throws Exception { - /* - * Disabled for Sentry HA. Since the sequence numbers are trakced in ZK, the - * lower sequence updates are ignored which causes this test to fail in HA - * mode - */ - Assume.assumeTrue(!testConf.getBoolean(ServerConfig.SENTRY_HA_ENABLED, - false)); - - DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); - imageRetreiver.setState("a,b,c"); - updateForwarder = UpdateForwarder.create( - testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true); - updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - - updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e")); - updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f")); - - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - Assert.assertEquals(7, updateForwarder.getLastUpdatedSeqNum()); - List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); - Assert.assertEquals(4, allUpdates.size()); - Assert.assertEquals("f", allUpdates.get(3).getState()); - Assert.assertEquals(7, allUpdates.get(3).getSeqNum()); - - updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - Assert.assertEquals(8, updateForwarder.getLastUpdatedSeqNum()); - allUpdates = updateForwarder.getAllUpdatesFrom(8); - Assert.assertEquals(1, allUpdates.size()); - Assert.assertEquals("g", allUpdates.get(0).getState()); - - imageRetreiver.setState("a,b,c,d,e,f,g,h"); - - // New update comes with SeqNum = 1 - updateForwarder.handleUpdateNotification(new DummyUpdate(1, false).setState("h")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - // NN plugin asks for next update - allUpdates = updateForwarder.getAllUpdatesFrom(9); - Assert.assertEquals(1, allUpdates.size()); - Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState()); - // Assert.assertEquals(1, allUpdates.get(0).getSeqNum()); - } - - @Test - public void testUpdateLogCompression() throws Exception { - DummyImageRetreiver imageRetreiver = new DummyImageRetreiver(); - imageRetreiver.setState("a,b,c"); - updateForwarder = UpdateForwarder.create( - testConf, new DummyUpdatable(), new DummyUpdate(), imageRetreiver, 5, true); - updateForwarder.handleUpdateNotification(new DummyUpdate(5, false).setState("d")); - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - Assert.assertEquals(5, updateForwarder.getLastUpdatedSeqNum()); - List<DummyUpdate> allUpdates = updateForwarder.getAllUpdatesFrom(0); - Assert.assertEquals(2, allUpdates.size()); - - updateForwarder.handleUpdateNotification(new DummyUpdate(6, false).setState("e")); - updateForwarder.handleUpdateNotification(new DummyUpdate(7, false).setState("f")); - updateForwarder.handleUpdateNotification(new DummyUpdate(8, false).setState("g")); - updateForwarder.handleUpdateNotification(new DummyUpdate(9, false).setState("h")); - updateForwarder.handleUpdateNotification(new DummyUpdate(10, false).setState("i")); - updateForwarder.handleUpdateNotification(new DummyUpdate(11, false).setState("j")); - - while(!updateForwarder.areAllUpdatesCommited()) { - Thread.sleep(100); - } - Assert.assertEquals(11, updateForwarder.getLastUpdatedSeqNum()); - allUpdates = updateForwarder.getAllUpdatesFrom(0); - Assert.assertEquals(3, allUpdates.size()); - Assert.assertEquals("a,b,c,d,e,f,g,h", allUpdates.get(0).getState()); - Assert.assertEquals(9, allUpdates.get(0).getSeqNum()); - Assert.assertEquals("i", allUpdates.get(1).getState()); - Assert.assertEquals(10, allUpdates.get(1).getSeqNum()); - Assert.assertEquals("j", allUpdates.get(2).getState()); - Assert.assertEquals(11, allUpdates.get(2).getSeqNum()); - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 3536579..d1edcb1 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -3404,6 +3404,33 @@ public class SentryStore { } /** + * Get the list of MSentryPermChange objects greater than and + * equal with the given ChangeID. + * + * @param changeID + * @return the list of MSentryPermChange objects + * @throws Exception + */ + public List<MSentryPermChange> getMSentryPermChanges(final long changeID) + throws Exception { + return tm.executeTransaction( + new TransactionBlock<List<MSentryPermChange>>() { + public List<MSentryPermChange> execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryPermChange.class); + query.setFilter("this.changeID >= t"); + query.declareParameters("long t"); + List<MSentryPermChange> permChanges = + (List<MSentryPermChange>)query.execute(changeID); + if (permChanges == null) { + noSuchUpdate(changeID); + } + + return permChanges; + } + }); + } + + /** * Get the MSentryPermChange object by ChangeID. * * @param changeID the given changeID. @@ -3429,6 +3456,57 @@ public class SentryStore { } /** + * Find the MSentryPermChange object by ChangeID. + * + * @param changeID + * @return true if found the MSentryPermChange object, otherwise false. + * @throws Exception + */ + public boolean findMSentryPermChangeByID(final long changeID) throws Exception { + return tm.executeTransaction( + new TransactionBlock<Boolean>() { + public Boolean execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryPermChange.class); + query.setFilter("this.changeID == t"); + query.declareParameters("long t"); + List<MSentryPermChange> pathChanges = + (List<MSentryPermChange>)query.execute(changeID); + if (pathChanges == null) { + return false; + } else { + return true; + } + } + }); + } + + /** + * Get the list of MSentryPathChange objects greater than and + * equal with the given ChangeID. + * + * @param changeID + * @return the list of MSentryPathChange objects + * @throws Exception + */ + public List<MSentryPathChange> getMSentryPathChanges(final long changeID) + throws Exception { + return tm.executeTransaction( + new TransactionBlock<List<MSentryPathChange>>() { + public List<MSentryPathChange> execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryPathChange.class); + query.setFilter("this.changeID >= t"); + query.declareParameters("long t"); + List<MSentryPathChange> pathChanges = + (List<MSentryPathChange>)query.execute(changeID); + if (pathChanges == null) { + noSuchUpdate(changeID); + } + return pathChanges; + } + }); + } + + /** * Get the MSentryPathChange object by ChangeID. */ public MSentryPathChange getMSentryPathChangeByID(final long changeID) throws Exception { @@ -3451,6 +3529,30 @@ public class SentryStore { } /** + * Find the MSentryPathChange object by ChangeID. + * + * @param changeID + * @return true if found the MSentryPathChange object, otherwise false. + * @throws Exception + */ + public boolean findMSentryPathChangeByID(final long changeID) throws Exception { + return tm.executeTransaction( + new TransactionBlock<Boolean>() { + public Boolean execute(PersistenceManager pm) throws Exception { + Query query = pm.newQuery(MSentryPathChange.class); + query.setFilter("this.changeID == t"); + query.declareParameters("long t"); + List<MSentryPathChange> pathChanges = + (List<MSentryPathChange>)query.execute(changeID); + if (pathChanges == null) { + return false; + } else { + return true; + } + } + }); + } + /** * Execute Perm/Path UpdateTransaction and corresponding actual * action transaction, e.g dropSentryRole, in a single transaction. * The order of the transaction does not matter because there is no http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index be59c1e..7d818c1 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -158,8 +158,9 @@ public class SentryService implements Callable, SigUtils.SigListener { if (notificationLogEnabled) { try { hmsFollowerExecutor = Executors.newScheduledThreadPool(1); + //TODO: make initialDelay and period time to be configurable hmsFollowerExecutor.scheduleAtFixedRate(new HMSFollower(conf, leaderMonitor), - 60000, 500, TimeUnit.MILLISECONDS); + 30000, 500, TimeUnit.MILLISECONDS); } catch (Exception e) { //TODO: Handle LOGGER.error("Could not start HMSFollower"); http://git-wip-us.apache.org/repos/asf/sentry/blob/18be1d5e/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java index 4f4d3e6..7b6d753 100644 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java +++ b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/hdfs/TestHDFSIntegration.java @@ -221,7 +221,7 @@ public class TestHDFSIntegration { @Override public Void run() throws Exception { HiveConf hiveConf = new HiveConf(); - hiveConf.set("sentry.metastore.plugins", "org.apache.sentry.hdfs.MetastorePlugin"); + //hiveConf.set("sentry.metastore.plugins", "org.apache.sentry.hdfs.MetastorePlugin"); hiveConf.set("sentry.service.client.server.rpc-address", "localhost"); hiveConf.set("sentry.hdfs.service.client.server.rpc-address", "localhost"); hiveConf.set("sentry.hdfs.service.client.server.rpc-port", String.valueOf(sentryPort)); @@ -261,7 +261,8 @@ public class TestHDFSIntegration { hiveConf.set("hive.metastore.authorization.storage.checks", "true"); hiveConf.set("hive.metastore.uris", "thrift://localhost:" + hmsPort); hiveConf.set("hive.metastore.pre.event.listeners", "org.apache.sentry.binding.metastore.MetastoreAuthzBinding"); - hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListener"); + hiveConf.set("hive.metastore.event.listeners", "org.apache.sentry.binding.metastore.SentryMetastorePostEventListenerNotificationLog"); + hiveConf.set("hcatalog.message.factory.impl.json","org.apache.sentry.binding.metastore.messaging.json.SentryJSONMessageFactory"); hiveConf.set("hive.security.authorization.task.factory", "org.apache.sentry.binding.hive.SentryHiveAuthorizationTaskFactoryImpl"); hiveConf.set("hive.server2.session.hook", "org.apache.sentry.binding.hive.HiveAuthzBindingSessionHook"); hiveConf.set("sentry.metastore.service.users", "hive");// queries made by hive user (beeline) skip meta store check @@ -451,6 +452,7 @@ public class TestHDFSIntegration { properties.put(ServerConfig.RPC_ADDRESS, "localhost"); properties.put(ServerConfig.RPC_PORT, String.valueOf(sentryPort > 0 ? sentryPort : 0)); properties.put(ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false"); + properties.put(ServerConfig.SENTRY_NOTIFICATION_LOG_ENABLED,"true"); properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING, ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING); properties.put(ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, policyFileLocation.getPath());
