This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new aa67da5 [STORM-3767] fix NPE in getComponentPendingProfileActions
aa67da5 is described below
commit aa67da56da0e21506624ea9ad12b35b630d28dc8
Author: Meng (Ethan) Li <[email protected]>
AuthorDate: Wed Sep 15 23:52:32 2021 -0500
[STORM-3767] fix NPE in getComponentPendingProfileActions
---
conf/defaults.yaml | 2 +-
docs/STORM-UI-REST-API.md | 4 ++--
docs/Serialization.md | 4 ++--
storm-client/src/jvm/org/apache/storm/Config.java | 2 +-
.../apache/storm/messaging/netty/MessageDecoder.java | 11 ++++++++++-
.../src/jvm/org/apache/storm/utils/ShellUtils.java | 18 ++----------------
.../java/org/apache/storm/daemon/nimbus/Nimbus.java | 12 +++++++-----
7 files changed, 25 insertions(+), 28 deletions(-)
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 3e2a8f8..909823c 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -268,7 +268,7 @@ topology.max.spout.pending: null # ideally should be
larger than topology.pro
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.builtin.metrics.bucket.size.secs: 60
-topology.fall.back.on.java.serialization: true
+topology.fall.back.on.java.serialization: false
topology.worker.childopts: null
topology.worker.logwriter.childopts: "-Xmx64m"
topology.tick.tuple.freq.secs: null
diff --git a/docs/STORM-UI-REST-API.md b/docs/STORM-UI-REST-API.md
index 9b74d08..b4f1d9b 100644
--- a/docs/STORM-UI-REST-API.md
+++ b/docs/STORM-UI-REST-API.md
@@ -63,7 +63,7 @@ Sample response (does not include all the data fields):
"dev.zookeeper.path": "/tmp/dev-storm-zookeeper",
"topology.tick.tuple.freq.secs": null,
"topology.builtin.metrics.bucket.size.secs": 60,
- "topology.fall.back.on.java.serialization": true,
+ "topology.fall.back.on.java.serialization": false,
"topology.max.error.report.per.interval": 5,
"zmq.linger.millis": 5000,
"topology.skip.missing.kryo.registrations": false,
@@ -746,7 +746,7 @@ Sample response:
"dev.zookeeper.path": "/tmp/dev-storm-zookeeper",
"topology.tick.tuple.freq.secs": null,
"topology.builtin.metrics.bucket.size.secs": 60,
- "topology.fall.back.on.java.serialization": true,
+ "topology.fall.back.on.java.serialization": false,
"topology.max.error.report.per.interval": 5,
"zmq.linger.millis": 5000,
"topology.skip.missing.kryo.registrations": false,
diff --git a/docs/Serialization.md b/docs/Serialization.md
index e35a0f9..47f0606 100644
--- a/docs/Serialization.md
+++ b/docs/Serialization.md
@@ -53,11 +53,11 @@ You may use this like any other service loader and storm
will register the bindi
### Java serialization
-If Storm encounters a type for which it doesn't have a serialization
registered, it will use Java serialization if possible. If the object can't be
serialized with Java serialization, then Storm will throw an error.
+When `Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION` is set true, if Storm
encounters a type for which it doesn't have a serialization registered, it will
use Java serialization if possible. If the object can't be serialized with Java
serialization, then Storm will throw an error.
Beware that Java serialization is extremely expensive, both in terms of CPU
cost as well as the size of the serialized object. It is highly recommended
that you register custom serializers when you put the topology in production.
The Java serialization behavior is there so that it's easy to prototype new
topologies.
-You can turn off the behavior to fall back on Java serialization by setting
the `Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION` config to false.
+You can turn on/off the behavior to fall back on Java serialization by setting
the `Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION` config to true/false. The
default value is false for security reasons.
### Component-specific serialization registrations
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java
b/storm-client/src/jvm/org/apache/storm/Config.java
index 9de5dd8..27f637a 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -582,7 +582,7 @@ public class Config extends HashMap<String, Object> {
@IsInteger
public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS =
"topology.builtin.metrics.bucket.size.secs";
/**
- * Whether or not to use Java serialization in a topology.
+ * Whether or not to use Java serialization in a topology. Default is set
false for security reasons.
*/
@IsBoolean
public static final String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION =
"topology.fall.back.on.java.serialization";
diff --git
a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
index bced87c..a3b282a 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/MessageDecoder.java
@@ -19,9 +19,12 @@ import org.apache.storm.serialization.KryoValuesDeserializer;
import org.apache.storm.shade.io.netty.buffer.ByteBuf;
import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.storm.shade.io.netty.handler.codec.ByteToMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MessageDecoder extends ByteToMessageDecoder {
+ private static final Logger LOG =
LoggerFactory.getLogger(MessageDecoder.class);
private final KryoValuesDeserializer deser;
public MessageDecoder(KryoValuesDeserializer deser) {
@@ -164,4 +167,10 @@ public class MessageDecoder extends ByteToMessageDecoder {
out.add(ret);
}
}
-}
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.error("Exception thrown while decoding messages in channel {};
exception: ", ctx.channel(), cause);
+ ctx.close();
+ }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
b/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
index 96b3a02..86e34ea 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/ShellUtils.java
@@ -103,28 +103,14 @@ public abstract class ShellUtils {
}
/**
- * a Unix command to get the current user's groups list.
- */
- public static String[] getGroupsCommand() {
- if (WINDOWS) {
- throw new UnsupportedOperationException("Getting user groups is
not supported on Windows");
- }
- return new String[]{ "bash", "-c", "groups" };
- }
-
- /**
- * a Unix command to get a given user's groups list. If the OS is not
WINDOWS, the command will get the user's primary group first and
- * finally get the groups list which includes the primary group. i.e. the
user's primary group will be included twice.
+ * a Unix command to get a given user's groups list. Windows is not
supported.
*/
public static String[] getGroupsForUserCommand(final String user) {
if (WINDOWS) {
throw new UnsupportedOperationException("Getting user groups is
not supported on Windows");
}
//'groups username' command return is non-consistent across different
unixes
- return new String[]{
- "bash", "-c", "id -gn " + user
- + "&& id -Gn " + user
- };
+ return new String[]{"id", "-Gn", user};
}
private static void joinThread(Thread t) {
diff --git
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index e1d6fbe..acaa39c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -3611,12 +3611,14 @@ public class Nimbus implements Iface, Shutdownable,
DaemonCommon {
try {
getComponentPendingProfileActionsCalls.mark();
CommonTopoInfo info = getCommonTopoInfo(id,
"getComponentPendingProfileActions");
- Map<String, String> nodeToHost = info.assignment.get_node_host();
Map<List<? extends Number>, List<Object>> exec2hostPort = new
HashMap<>();
- for (Entry<List<Long>, NodeInfo> entry :
info.assignment.get_executor_node_port().entrySet()) {
- NodeInfo ni = entry.getValue();
- List<Object> hostPort =
Arrays.asList(nodeToHost.get(ni.get_node()),
ni.get_port_iterator().next().intValue());
- exec2hostPort.put(entry.getKey(), hostPort);
+ if (info.assignment != null) {
+ Map<String, String> nodeToHost =
info.assignment.get_node_host();
+ for (Entry<List<Long>, NodeInfo> entry :
info.assignment.get_executor_node_port().entrySet()) {
+ NodeInfo ni = entry.getValue();
+ List<Object> hostPort =
Arrays.asList(nodeToHost.get(ni.get_node()),
ni.get_port_iterator().next().intValue());
+ exec2hostPort.put(entry.getKey(), hostPort);
+ }
}
List<Map<String, Object>> nodeInfos =
StatsUtil.extractNodeInfosFromHbForComp(exec2hostPort,
info.taskToComponent, false, componentId);