Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2399#discussion_r148329501
--- Diff:
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
---
@@ -17,44 +17,102 @@
*/
package org.apache.storm.messaging;
+import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* A class that is called when a TaskMessage arrives.
*/
-public class DeserializingConnectionCallback implements
IConnectionCallback {
+public class DeserializingConnectionCallback implements
IConnectionCallback, IMetric {
private final WorkerState.ILocalTransferCallback _cb;
private final Map _conf;
private final GeneralTopologyContext _context;
+
private final ThreadLocal<KryoTupleDeserializer> _des =
- new ThreadLocal<KryoTupleDeserializer>() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
- public DeserializingConnectionCallback(final Map<String, Object> conf,
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback
callback) {
+ new ThreadLocal<KryoTupleDeserializer>() {
+ @Override
+ protected KryoTupleDeserializer initialValue() {
+ return new KryoTupleDeserializer(_conf, _context);
+ }
+ };
+
+ // Track serialized size of messages
+ private final boolean _sizeMetricsEnabled;
+ private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new
ConcurrentHashMap<>();
+
+
+ public DeserializingConnectionCallback(final Map conf, final
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
_conf = conf;
_context = context;
_cb = callback;
+ _sizeMetricsEnabled =
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
+ (Boolean)
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
+
}
@Override
public void recv(List<TaskMessage> batch) {
KryoTupleDeserializer des = _des.get();
ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
for (TaskMessage message: batch) {
- ret.add(new AddressedTuple(message.task(),
des.deserialize(message.message())));
+ Tuple tuple = des.deserialize(message.message());
+ AddressedTuple addrTuple = new AddressedTuple(message.task(),
tuple);
+ updateMetrics(tuple.getSourceTask(), message);
+ ret.add(addrTuple);
}
_cb.transfer(ret);
}
+ @Override
+ public Object getValueAndReset() {
+ HashMap<String, Long> outMap = new HashMap<>();
--- End diff --
Can we return a null if metrics are not enabled instead of an empty map? A
null means the metric will not be sent to the metrics collector, but an empty
map might mean that the metrics are disabled or that there was no activity at
all.
---