Repository: storm
Updated Branches:
  refs/heads/master d16ed4cb5 -> d8384f43b


[STORM-2704] Check blob permission before submitting the topology

* closes #2291


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5bc04478
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5bc04478
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5bc04478

Branch: refs/heads/master
Commit: 5bc04478dd4ac3613980abe3d5f5aa8f104efe55
Parents: d16ed4c
Author: Ethan Li <[email protected]>
Authored: Tue Aug 22 13:44:33 2017 -0500
Committer: Jungtaek Lim <[email protected]>
Committed: Fri Aug 25 14:59:23 2017 +0900

----------------------------------------------------------------------
 .../jvm/org/apache/storm/StormSubmitter.java    | 11 +--
 .../src/jvm/org/apache/storm/utils/Utils.java   | 72 ++++++++++++++++----
 .../org/apache/storm/TestConfigValidate.java    | 38 ++++++++---
 .../org/apache/storm/daemon/nimbus/Nimbus.java  |  2 +-
 4 files changed, 92 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5bc04478/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java 
b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
index 0ad5399..0f6baf2 100644
--- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
+++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java
@@ -561,15 +561,8 @@ public class StormSubmitter {
         public void onCompleted(String srcFile, String targetFile, long 
totalBytes);
     }
 
-    private static void validateConfs(Map<String, Object> topoConf, 
StormTopology topology) throws IllegalArgumentException, 
InvalidTopologyException {
+    private static void validateConfs(Map<String, Object> topoConf, 
StormTopology topology) throws IllegalArgumentException, 
InvalidTopologyException, AuthorizationException {
         ConfigValidation.validateFields(topoConf);
-        Utils.validateTopologyBlobStoreMap(topoConf, 
getListOfKeysFromBlobStore(topoConf));
-    }
-
-    private static Set<String> getListOfKeysFromBlobStore(Map<String, Object> 
topoConf) {
-        try (NimbusBlobStore client = new NimbusBlobStore()) {
-            client.prepare(topoConf);
-            return Sets.newHashSet(client.listKeys());
-        }
+        Utils.validateTopologyBlobStoreMap(topoConf);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/5bc04478/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java 
b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 51790c2..b38c20f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -31,9 +31,7 @@ import java.io.InputStreamReader;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
 import java.io.Serializable;
-import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.ServerSocket;
@@ -68,16 +66,21 @@ import java.util.zip.GZIPOutputStream;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.input.ClassLoaderObjectInputStream;
 import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.blobstore.NimbusBlobStore;
+import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.ComponentObject;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.serialization.DefaultSerializationDelegate;
 import org.apache.storm.serialization.SerializationDelegate;
 import org.apache.thrift.TBase;
@@ -96,6 +99,8 @@ import org.yaml.snakeyaml.constructor.SafeConstructor;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import javax.security.auth.Subject;
+
 public class Utils {
     public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
     public static final String DEFAULT_STREAM_ID = "default";
@@ -1005,21 +1010,62 @@ public class Utils {
         return null;
     }
 
-    public static void validateTopologyBlobStoreMap(Map<String, Object> 
topoConf, Set<String> blobStoreKeys) throws InvalidTopologyException {
-        @SuppressWarnings("unchecked")
+    /**
+     * Validate topology blobstore map.
+     * @param topoConf Topology configuration
+     * @throws InvalidTopologyException
+     * @throws AuthorizationException
+     */
+    public static void validateTopologyBlobStoreMap(Map<String, Object> 
topoConf) throws InvalidTopologyException, AuthorizationException {
+        try (NimbusBlobStore client = new NimbusBlobStore()) {
+            client.prepare(topoConf);
+            validateTopologyBlobStoreMap(topoConf, client);
+        }
+    }
+
+    /**
+     * Validate topology blobstore map.
+     * @param topoConf Topology configuration
+     * @param client The NimbusBlobStore client. It must call prepare() before 
being used here.
+     * @throws InvalidTopologyException
+     * @throws AuthorizationException
+     */
+    public static void validateTopologyBlobStoreMap(Map<String, Object> 
topoConf, NimbusBlobStore client)
+            throws InvalidTopologyException, AuthorizationException {
         Map<String, Object> blobStoreMap = (Map<String, Object>) 
topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
         if (blobStoreMap != null) {
-            Set<String> mapKeys = blobStoreMap.keySet();
-            Set<String> missingKeys = new HashSet<>();
-
-            for (String key : mapKeys) {
-                if (!blobStoreKeys.contains(key)) {
-                    missingKeys.add(key);
+            for (String key : blobStoreMap.keySet()) {
+                // try to get BlobMeta
+                // This will check if the key exists and if the subject has 
authorization
+                try {
+                    client.getBlobMeta(key);
+                } catch (KeyNotFoundException keyNotFound) {
+                    // wrap KeyNotFoundException in an InvalidTopologyException
+                    throw new InvalidTopologyException("Key not found: " + 
keyNotFound.get_msg());
                 }
             }
-            if (!missingKeys.isEmpty()) {
-                throw new InvalidTopologyException("The topology blob store 
map does not " +
-                        "contain the valid keys to launch the topology " + 
missingKeys);
+        }
+    }
+
+    /**
+     * Validate topology blobstore map.
+     * @param topoConf Topology configuration
+     * @param blobStore The BlobStore
+     * @throws InvalidTopologyException
+     * @throws AuthorizationException
+     */
+    public static void validateTopologyBlobStoreMap(Map<String, Object> 
topoConf, BlobStore blobStore)
+            throws InvalidTopologyException, AuthorizationException {
+        Map<String, Object> blobStoreMap = (Map<String, Object>) 
topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+        if (blobStoreMap != null) {
+            Subject subject = ReqContext.context().subject();
+            for (String key : blobStoreMap.keySet()) {
+                try {
+                    blobStore.getBlobMeta(key, subject);
+                } catch (KeyNotFoundException keyNotFound) {
+                    // wrap KeyNotFoundException in an InvalidTopologyException
+                    throw new InvalidTopologyException("Key not found: " + 
keyNotFound.get_msg());
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/5bc04478/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java 
b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
index 74ed71e..b3e25c1 100644
--- a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
+++ b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
@@ -20,7 +20,12 @@ package org.apache.storm;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.NimbusBlobStore;
+import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.KeyNotFoundException;
+import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.validation.ConfigValidation;
 import org.apache.storm.validation.ConfigValidation.*;
@@ -30,15 +35,16 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.security.auth.Subject;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import static org.mockito.Mockito.*;
 
 public class TestConfigValidate {
 
@@ -85,18 +91,34 @@ public class TestConfigValidate {
     }
 
     @Test(expected = InvalidTopologyException.class)
-    public void testValidateTopologyBlobStoreMap() throws 
InvalidTopologyException {
+    public void testValidateTopologyBlobStoreMapWithBlobStore() throws 
InvalidTopologyException, AuthorizationException, KeyNotFoundException {
         Map<String, Object> topoConf = new HashMap<>();
         Map<String,Map> topologyMap = new HashMap<>();
         topologyMap.put("key1", new HashMap<String,String>());
         topologyMap.put("key2", new HashMap<String,String>());
         topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topologyMap);
-        HashSet<String> keySet = new HashSet<>();
-        keySet.add("key1");
-        keySet.add("key2");
-        Utils.validateTopologyBlobStoreMap(topoConf, keySet);
-        keySet.remove("key2");
-        Utils.validateTopologyBlobStoreMap(topoConf, keySet);
+        Subject subject = ReqContext.context().subject();
+
+        BlobStore blobStoreMock = mock(BlobStore.class);
+        when(blobStoreMock.getBlobMeta("key1", subject)).thenReturn(null);
+        when(blobStoreMock.getBlobMeta("key2", subject)).thenThrow(new 
KeyNotFoundException());
+
+        Utils.validateTopologyBlobStoreMap(topoConf, blobStoreMock);
+    }
+
+    @Test(expected = InvalidTopologyException.class)
+    public void testValidateTopologyBlobStoreMapWithNimbusBlobStore() throws 
InvalidTopologyException, AuthorizationException, KeyNotFoundException {
+        Map<String, Object> topoConf = new HashMap<>();
+        Map<String,Map> topologyMap = new HashMap<>();
+        topologyMap.put("key1", new HashMap<String,String>());
+        topologyMap.put("key2", new HashMap<String,String>());
+        topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topologyMap);
+
+        NimbusBlobStore nimbusBlobStoreMock = mock(NimbusBlobStore.class);
+        when(nimbusBlobStoreMock.getBlobMeta("key1")).thenReturn(null);
+        when(nimbusBlobStoreMock.getBlobMeta("key2")).thenThrow(new 
KeyNotFoundException());
+
+        Utils.validateTopologyBlobStoreMap(topoConf, nimbusBlobStoreMock);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/5bc04478/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 6a6b7c9..fb2c3e1 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -2529,7 +2529,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 }
             }
             validateTopologyWorkerMaxHeapSizeConfigs(topoConf, topology);
-            Utils.validateTopologyBlobStoreMap(topoConf, 
Sets.newHashSet(blobStore.listKeys()));
+            Utils.validateTopologyBlobStoreMap(topoConf, blobStore);
             long uniqueNum = submittedCount.incrementAndGet();
             String topoId = topoName + "-" + uniqueNum + "-" + 
Time.currentTimeSecs();
             Map<String, String> creds = null;

Reply via email to