Repository: storm
Updated Branches:
  refs/heads/master c2cb25b95 -> 809c4b2a9


STORM-2414 Skip checking meta's ACL when subject has write privileges for any 
blobs

* also ignore FileNotFoundException while deleting local blob file since it is 
what we are about to do


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0c689b1c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0c689b1c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0c689b1c

Branch: refs/heads/master
Commit: 0c689b1c6d0df7ddf1c4cf4bb47fb29b08e0a14f
Parents: c2cb25b
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Wed Mar 15 12:47:14 2017 +0900
Committer: Jungtaek Lim <kabh...@gmail.com>
Committed: Mon Mar 20 17:39:25 2017 +0900

----------------------------------------------------------------------
 .../storm/blobstore/LocalFsBlobStore.java       | 45 +++++++++++++++++---
 1 file changed, 38 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0c689b1c/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java 
b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
index 266d4b7..d1d68b6 100644
--- a/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-core/src/jvm/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -130,9 +130,7 @@ public class LocalFsBlobStore extends BlobStore {
     @Override
     public AtomicOutputStream updateBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
         validateKey(key);
-        checkForBlobOrDownload(key);
-        SettableBlobMeta meta = getStoredBlobMeta(key);
-        _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+        checkPermission(key, who, WRITE);
         try {
             return new BlobStoreFileOutputStream(fbs.write(DATA_PREFIX+key, 
false));
         } catch (IOException e) {
@@ -220,14 +218,47 @@ public class LocalFsBlobStore extends BlobStore {
     @Override
     public void deleteBlob(String key, Subject who) throws 
AuthorizationException, KeyNotFoundException {
         validateKey(key);
+
+        if (!_aclHandler.checkForValidUsers(who, WRITE)) {
+            // need to get ACL from meta
+            LOG.debug("Retrieving meta to get ACL info... key: {} subject: 
{}", key, who);
+
+            try {
+                checkPermission(key, who, WRITE);
+            } catch (KeyNotFoundException e) {
+                LOG.error("Error while retrieving meta from ZK or local... 
key: {} subject: {}", key, who);
+                throw e;
+            }
+        } else {
+            // able to delete the blob without checking meta's ACL
+            // skip checking everything and continue deleting local files
+            LOG.debug("Given subject is eligible to delete key without 
checking ACL, skipping... key: {} subject: {}",
+                    key, who);
+        }
+
+        try {
+            deleteKeyIgnoringFileNotFound(DATA_PREFIX + key);
+            deleteKeyIgnoringFileNotFound(META_PREFIX + key);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void checkPermission(String key, Subject who, int mask) throws 
KeyNotFoundException, AuthorizationException {
         checkForBlobOrDownload(key);
         SettableBlobMeta meta = getStoredBlobMeta(key);
-        _aclHandler.hasPermissions(meta.get_acl(), WRITE, who, key);
+        _aclHandler.hasPermissions(meta.get_acl(), mask, who, key);
+    }
+
+    private void deleteKeyIgnoringFileNotFound(String key) throws IOException {
         try {
-            fbs.deleteKey(DATA_PREFIX+key);
-            fbs.deleteKey(META_PREFIX+key);
+            fbs.deleteKey(key);
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            if (e instanceof FileNotFoundException) {
+                LOG.debug("Ignoring FileNotFoundException since we're about to 
delete such key... key: {}", key);
+            } else {
+                throw e;
+            }
         }
     }
 

Reply via email to