This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 6c8b6cb STORM-3775 validate optional boolean fields in
topology.blobstore.map (#3399)
6c8b6cb is described below
commit 6c8b6cbeb387817d72432637acd19a71440e0173
Author: agresch <[email protected]>
AuthorDate: Mon Jun 21 09:41:58 2021 -0500
STORM-3775 validate optional boolean fields in topology.blobstore.map
(#3399)
* STORM-3775 validate optional boolean fields in topology.blobstore.map
---
.../src/jvm/org/apache/storm/utils/Utils.java | 12 ++++++--
.../jvm/org/apache/storm/TestConfigValidate.java | 36 +++++++++++++++++++++-
.../storm/daemon/supervisor/SupervisorUtils.java | 4 +--
3 files changed, 47 insertions(+), 5 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 1f500c0..a634402 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -77,7 +77,6 @@ import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.blobstore.NimbusBlobStore;
import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.ComponentObject;
import org.apache.storm.generated.GlobalStreamId;
@@ -1233,9 +1232,18 @@ public class Utils {
*/
public static void validateTopologyBlobStoreMap(Map<String, Object>
topoConf, NimbusBlobStore client)
throws InvalidTopologyException, AuthorizationException {
- Map<String, Object> blobStoreMap = (Map<String, Object>)
topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
+ Map<String, Map<String, Object>> blobStoreMap = (Map<String,
Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);
if (blobStoreMap != null) {
for (String key : blobStoreMap.keySet()) {
+
+ Map<String, Object> blobConf = blobStoreMap.get(key);
+ try {
+ ObjectReader.getBoolean(blobConf.get("uncompress"), false);
+ ObjectReader.getBoolean(blobConf.get("workerRestart"),
false);
+ } catch (IllegalArgumentException e) {
+ throw new WrappedInvalidTopologyException("Invalid blob
conf option: " + e.getMessage());
+ }
+
// try to get BlobMeta
// This will check if the key exists and if the subject has
authorization
try {
diff --git a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
index dedf937..8728f12 100644
--- a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
+++ b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java
@@ -136,7 +136,7 @@ public class TestConfigValidate {
}
@Test(expected = InvalidTopologyException.class)
- public void testValidateTopologyBlobStoreMapWithNimbusBlobStore() throws
InvalidTopologyException, AuthorizationException,
+ public void testValidateTopologyBlobStoreMissingKey() throws
InvalidTopologyException, AuthorizationException,
KeyNotFoundException {
Map<String, Object> topoConf = new HashMap<>();
Map<String, Map> topologyMap = new HashMap<>();
@@ -152,6 +152,40 @@ public class TestConfigValidate {
}
@Test
+ public void testValidateTopologyBlobStoreMap() throws
InvalidTopologyException, AuthorizationException,
+ KeyNotFoundException {
+ Map<String, Object> topoConf = new HashMap<>();
+ Map<String, Map> topologyMap = new HashMap<>();
+ Map<String, Object> blobConf = new HashMap<>();
+ blobConf.put("uncompress", false);
+ topologyMap.put("key1", blobConf);
+ topologyMap.put("key2", blobConf);
+ topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topologyMap);
+
+ NimbusBlobStore nimbusBlobStoreMock = mock(NimbusBlobStore.class);
+ when(nimbusBlobStoreMock.getBlobMeta("key1")).thenReturn(null);
+ when(nimbusBlobStoreMock.getBlobMeta("key2")).thenReturn(null);
+
+ Utils.validateTopologyBlobStoreMap(topoConf, nimbusBlobStoreMock);
+ }
+
+ @Test(expected = InvalidTopologyException.class)
+ public void testValidateTopologyBlobStoreMapInvalidOption() throws
InvalidTopologyException, AuthorizationException,
+ KeyNotFoundException {
+ Map<String, Object> topoConf = new HashMap<>();
+ Map<String, Map> topologyMap = new HashMap<>();
+ Map<String, Object> blobConf = new HashMap<>();
+ blobConf.put("uncompress", "false");
+ topologyMap.put("key1", blobConf);
+ topologyMap.put("key2", blobConf);
+ topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topologyMap);
+
+ NimbusBlobStore nimbusBlobStoreMock = mock(NimbusBlobStore.class);
+
+ Utils.validateTopologyBlobStoreMap(topoConf, nimbusBlobStoreMock);
+ }
+
+ @Test
public void defaultYamlTest() throws InvocationTargetException,
NoSuchMethodException, NoSuchFieldException, InstantiationException,
IllegalAccessException {
Map<String, Object> conf = Utils.readStormConfig();
diff --git
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
index 43aff1f..5948a54 100644
---
a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
+++
b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java
@@ -120,7 +120,7 @@ public class SupervisorUtils {
}
/**
- * Given the blob information returns the value of the uncompress field,
handling it either being a string or a boolean value, or if
+ * Given the blob information returns the value of the uncompress field,
handling it being a boolean value, or if
* it's not specified then returns false.
*/
public static boolean shouldUncompressBlob(Map<String, Object> blobInfo) {
@@ -128,7 +128,7 @@ public class SupervisorUtils {
}
/**
- * Given the blob information returns the value of the workerRestart
field, handling it either being a string or a boolean value, or if
+ * Given the blob information returns the value of the workerRestart
field, handling it being a boolean value, or if
* it's not specified then returns false.
*
* @param blobInfo the info for the blob.