Repository: storm Updated Branches: refs/heads/master c2cb25b95 -> 809c4b2a9
STORM-2414 Skip checking meta's ACL when subject has write privileges for any blobs * also ignore FileNotFoundException while deleting local blob file since it is what we are about to do Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0c689b1c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0c689b1c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0c689b1c Branch: refs/heads/master Commit: 0c689b1c6d0df7ddf1c4cf4bb47fb29b08e0a14f Parents: c2cb25b Author: Jungtaek Lim <kabh...@gmail.com> Authored: Wed Mar 15 12:47:14 2017 +0900 Committer: Jungtaek Lim <kabh...@gmail.com> Committed: Mon Mar 20 17:39:25 2017 +0900 ---------------------------------------------------------------------- .../storm/blobstore/LocalFsBlobStore.java | 45 +++++++++++++++++--- 1 file changed, 38 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/0c689b1c/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java index 266d4b7..d1d68b6 100644 --- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -130,9 +130,7 @@ public class LocalFsBlobStore extends BlobStore { @Override public AtomicOutputStream updateBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { validateKey(key); - checkForBlobOrDownload(key); - SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); + checkPermission(key, who, WRITE); try { return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, false)); } catch (IOException e) { @@ -220,14 +218,47 @@ public class LocalFsBlobStore extends BlobStore { @Override public void deleteBlob(String key, Subject who) throws AuthorizationException, KeyNotFoundException { validateKey(key); + + if (!_aclHandler.checkForValidUsers(who, WRITE)) { + // need to get ACL from meta + LOG.debug("Retrieving meta to get ACL info... key: {} subject: {}", key, who); + + try { + checkPermission(key, who, WRITE); + } catch (KeyNotFoundException e) { + LOG.error("Error while retrieving meta from ZK or local... key: {} subject: {}", key, who); + throw e; + } + } else { + // able to delete the blob without checking meta's ACL + // skip checking everything and continue deleting local files + LOG.debug("Given subject is eligible to delete key without checking ACL, skipping... key: {} subject: {}", + key, who); + } + + try { + deleteKeyIgnoringFileNotFound(DATA_PREFIX + key); + deleteKeyIgnoringFileNotFound(META_PREFIX + key); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void checkPermission(String key, Subject who, int mask) throws KeyNotFoundException, AuthorizationException { checkForBlobOrDownload(key); SettableBlobMeta meta = getStoredBlobMeta(key); - _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key); + _aclHandler.hasPermissions(meta.get_acl(), mask, who, key); + } + + private void deleteKeyIgnoringFileNotFound(String key) throws IOException { try { - fbs.deleteKey(DATA_PREFIX+key); - fbs.deleteKey(META_PREFIX+key); + fbs.deleteKey(key); } catch (IOException e) { - throw new RuntimeException(e); + if (e instanceof FileNotFoundException) { + LOG.debug("Ignoring FileNotFoundException since we're about to delete such key... key: {}", key); + } else { + throw e; + } } }