Github user jmartell7 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2399#discussion_r148328791
--- Diff:
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
---
@@ -17,44 +17,102 @@
*/
package org.apache.storm.messaging;
+import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* A class that is called when a TaskMessage arrives.
*/
-public class DeserializingConnectionCallback implements
IConnectionCallback {
+public class DeserializingConnectionCallback implements
IConnectionCallback, IMetric {
private final WorkerState.ILocalTransferCallback _cb;
private final Map _conf;
private final GeneralTopologyContext _context;
+
private final ThreadLocal<KryoTupleDeserializer> _des =
- new ThreadLocal<KryoTupleDeserializer>() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
- public DeserializingConnectionCallback(final Map<String, Object> conf,
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback
callback) {
+ new ThreadLocal<KryoTupleDeserializer>() {
+ @Override
+ protected KryoTupleDeserializer initialValue() {
+ return new KryoTupleDeserializer(_conf, _context);
+ }
+ };
+
+ // Track serialized size of messages
+ private final boolean _sizeMetricsEnabled;
+ private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new
ConcurrentHashMap<>();
+
+
+ public DeserializingConnectionCallback(final Map conf, final
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
_conf = conf;
_context = context;
_cb = callback;
+ _sizeMetricsEnabled =
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
+ (Boolean)
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
+
}
@Override
public void recv(List<TaskMessage> batch) {
KryoTupleDeserializer des = _des.get();
ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
for (TaskMessage message: batch) {
- ret.add(new AddressedTuple(message.task(),
des.deserialize(message.message())));
+ Tuple tuple = des.deserialize(message.message());
+ AddressedTuple addrTuple = new AddressedTuple(message.task(),
tuple);
+ updateMetrics(tuple.getSourceTask(), message);
+ ret.add(addrTuple);
}
_cb.transfer(ret);
}
+ @Override
+ public Object getValueAndReset() {
+ HashMap<String, Long> outMap = new HashMap<>();
+
+ if (_sizeMetricsEnabled) { // Possible race conditions
+ for (Map.Entry<String, AtomicLong> ent :
_byteCounts.entrySet()) {
+ AtomicLong count = ent.getValue();
+ if (count.get() > 0) {
+ outMap.put(ent.getKey(), count.getAndSet(0L));
+ }
+ }
+ }
+ return outMap;
+ }
+
+ /**
+ * Update serialized byte counts for each message
+ * @param sourceTaskId source task
+ * @param message serialized message
+ */
+ protected void updateMetrics(int sourceTaskId, TaskMessage message) {
+ if (_sizeMetricsEnabled) { // Possible race conditions
+ int dest = message.task();
+ int len = message.message().length;
+ String key = Integer.toString(sourceTaskId) + "-" +
Integer.toString(dest);
--- End diff --
Should this use the ComponentId instead?
---