Repository: storm Updated Branches: refs/heads/master d16ed4cb5 -> d8384f43b
[STORM-2704] Check blob permission before submitting the topology * closes #2291 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5bc04478 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5bc04478 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5bc04478 Branch: refs/heads/master Commit: 5bc04478dd4ac3613980abe3d5f5aa8f104efe55 Parents: d16ed4c Author: Ethan Li <[email protected]> Authored: Tue Aug 22 13:44:33 2017 -0500 Committer: Jungtaek Lim <[email protected]> Committed: Fri Aug 25 14:59:23 2017 +0900 ---------------------------------------------------------------------- .../jvm/org/apache/storm/StormSubmitter.java | 11 +-- .../src/jvm/org/apache/storm/utils/Utils.java | 72 ++++++++++++++++---- .../org/apache/storm/TestConfigValidate.java | 38 ++++++++--- .../org/apache/storm/daemon/nimbus/Nimbus.java | 2 +- 4 files changed, 92 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/5bc04478/storm-client/src/jvm/org/apache/storm/StormSubmitter.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java index 0ad5399..0f6baf2 100644 --- a/storm-client/src/jvm/org/apache/storm/StormSubmitter.java +++ b/storm-client/src/jvm/org/apache/storm/StormSubmitter.java @@ -561,15 +561,8 @@ public class StormSubmitter { public void onCompleted(String srcFile, String targetFile, long totalBytes); } - private static void validateConfs(Map<String, Object> topoConf, StormTopology topology) throws IllegalArgumentException, InvalidTopologyException { + private static void validateConfs(Map<String, Object> topoConf, StormTopology topology) throws IllegalArgumentException, InvalidTopologyException, AuthorizationException { ConfigValidation.validateFields(topoConf); - Utils.validateTopologyBlobStoreMap(topoConf, getListOfKeysFromBlobStore(topoConf)); - } - - private static Set<String> getListOfKeysFromBlobStore(Map<String, Object> topoConf) { - try (NimbusBlobStore client = new NimbusBlobStore()) { - client.prepare(topoConf); - return Sets.newHashSet(client.listKeys()); - } + Utils.validateTopologyBlobStoreMap(topoConf); } } http://git-wip-us.apache.org/repos/asf/storm/blob/5bc04478/storm-client/src/jvm/org/apache/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index 51790c2..b38c20f 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -31,9 +31,7 @@ import java.io.InputStreamReader; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.OutputStreamWriter; -import java.io.PrintWriter; import java.io.Serializable; -import java.io.StringWriter; import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.ServerSocket; @@ -68,16 +66,21 @@ import java.util.zip.GZIPOutputStream; import org.apache.commons.io.FileUtils; import org.apache.commons.io.input.ClassLoaderObjectInputStream; import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.blobstore.NimbusBlobStore; +import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.ComponentCommon; import org.apache.storm.generated.ComponentObject; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.Nimbus; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.TopologyInfo; import org.apache.storm.generated.TopologySummary; +import org.apache.storm.security.auth.ReqContext; import org.apache.storm.serialization.DefaultSerializationDelegate; import org.apache.storm.serialization.SerializationDelegate; import org.apache.thrift.TBase; @@ -96,6 +99,8 @@ import org.yaml.snakeyaml.constructor.SafeConstructor; import com.google.common.annotations.VisibleForTesting; +import javax.security.auth.Subject; + public class Utils { public static final Logger LOG = LoggerFactory.getLogger(Utils.class); public static final String DEFAULT_STREAM_ID = "default"; @@ -1005,21 +1010,62 @@ public class Utils { return null; } - public static void validateTopologyBlobStoreMap(Map<String, Object> topoConf, Set<String> blobStoreKeys) throws InvalidTopologyException { - @SuppressWarnings("unchecked") + /** + * Validate topology blobstore map. + * @param topoConf Topology configuration + * @throws InvalidTopologyException + * @throws AuthorizationException + */ + public static void validateTopologyBlobStoreMap(Map<String, Object> topoConf) throws InvalidTopologyException, AuthorizationException { + try (NimbusBlobStore client = new NimbusBlobStore()) { + client.prepare(topoConf); + validateTopologyBlobStoreMap(topoConf, client); + } + } + + /** + * Validate topology blobstore map. + * @param topoConf Topology configuration + * @param client The NimbusBlobStore client. It must call prepare() before being used here. + * @throws InvalidTopologyException + * @throws AuthorizationException + */ + public static void validateTopologyBlobStoreMap(Map<String, Object> topoConf, NimbusBlobStore client) + throws InvalidTopologyException, AuthorizationException { Map<String, Object> blobStoreMap = (Map<String, Object>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); if (blobStoreMap != null) { - Set<String> mapKeys = blobStoreMap.keySet(); - Set<String> missingKeys = new HashSet<>(); - - for (String key : mapKeys) { - if (!blobStoreKeys.contains(key)) { - missingKeys.add(key); + for (String key : blobStoreMap.keySet()) { + // try to get BlobMeta + // This will check if the key exists and if the subject has authorization + try { + client.getBlobMeta(key); + } catch (KeyNotFoundException keyNotFound) { + // wrap KeyNotFoundException in an InvalidTopologyException + throw new InvalidTopologyException("Key not found: " + keyNotFound.get_msg()); } } - if (!missingKeys.isEmpty()) { - throw new InvalidTopologyException("The topology blob store map does not " + - "contain the valid keys to launch the topology " + missingKeys); + } + } + + /** + * Validate topology blobstore map. + * @param topoConf Topology configuration + * @param blobStore The BlobStore + * @throws InvalidTopologyException + * @throws AuthorizationException + */ + public static void validateTopologyBlobStoreMap(Map<String, Object> topoConf, BlobStore blobStore) + throws InvalidTopologyException, AuthorizationException { + Map<String, Object> blobStoreMap = (Map<String, Object>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + if (blobStoreMap != null) { + Subject subject = ReqContext.context().subject(); + for (String key : blobStoreMap.keySet()) { + try { + blobStore.getBlobMeta(key, subject); + } catch (KeyNotFoundException keyNotFound) { + // wrap KeyNotFoundException in an InvalidTopologyException + throw new InvalidTopologyException("Key not found: " + keyNotFound.get_msg()); + } } } } http://git-wip-us.apache.org/repos/asf/storm/blob/5bc04478/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java ---------------------------------------------------------------------- diff --git a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java index 74ed71e..b3e25c1 100644 --- a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java +++ b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java @@ -20,7 +20,12 @@ package org.apache.storm; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.NimbusBlobStore; +import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.security.auth.ReqContext; import org.apache.storm.utils.Utils; import org.apache.storm.validation.ConfigValidation; import org.apache.storm.validation.ConfigValidation.*; @@ -30,15 +35,16 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.security.auth.Subject; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashSet; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import static org.mockito.Mockito.*; public class TestConfigValidate { @@ -85,18 +91,34 @@ public class TestConfigValidate { } @Test(expected = InvalidTopologyException.class) - public void testValidateTopologyBlobStoreMap() throws InvalidTopologyException { + public void testValidateTopologyBlobStoreMapWithBlobStore() throws InvalidTopologyException, AuthorizationException, KeyNotFoundException { Map<String, Object> topoConf = new HashMap<>(); Map<String,Map> topologyMap = new HashMap<>(); topologyMap.put("key1", new HashMap<String,String>()); topologyMap.put("key2", new HashMap<String,String>()); topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topologyMap); - HashSet<String> keySet = new HashSet<>(); - keySet.add("key1"); - keySet.add("key2"); - Utils.validateTopologyBlobStoreMap(topoConf, keySet); - keySet.remove("key2"); - Utils.validateTopologyBlobStoreMap(topoConf, keySet); + Subject subject = ReqContext.context().subject(); + + BlobStore blobStoreMock = mock(BlobStore.class); + when(blobStoreMock.getBlobMeta("key1", subject)).thenReturn(null); + when(blobStoreMock.getBlobMeta("key2", subject)).thenThrow(new KeyNotFoundException()); + + Utils.validateTopologyBlobStoreMap(topoConf, blobStoreMock); + } + + @Test(expected = InvalidTopologyException.class) + public void testValidateTopologyBlobStoreMapWithNimbusBlobStore() throws InvalidTopologyException, AuthorizationException, KeyNotFoundException { + Map<String, Object> topoConf = new HashMap<>(); + Map<String,Map> topologyMap = new HashMap<>(); + topologyMap.put("key1", new HashMap<String,String>()); + topologyMap.put("key2", new HashMap<String,String>()); + topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topologyMap); + + NimbusBlobStore nimbusBlobStoreMock = mock(NimbusBlobStore.class); + when(nimbusBlobStoreMock.getBlobMeta("key1")).thenReturn(null); + when(nimbusBlobStoreMock.getBlobMeta("key2")).thenThrow(new KeyNotFoundException()); + + Utils.validateTopologyBlobStoreMap(topoConf, nimbusBlobStoreMock); } @Test http://git-wip-us.apache.org/repos/asf/storm/blob/5bc04478/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 6a6b7c9..fb2c3e1 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -2529,7 +2529,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } } validateTopologyWorkerMaxHeapSizeConfigs(topoConf, topology); - Utils.validateTopologyBlobStoreMap(topoConf, Sets.newHashSet(blobStore.listKeys())); + Utils.validateTopologyBlobStoreMap(topoConf, blobStore); long uniqueNum = submittedCount.incrementAndGet(); String topoId = topoName + "-" + uniqueNum + "-" + Time.currentTimeSecs(); Map<String, String> creds = null;
