Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148329501
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements 
IConnectionCallback {
    +public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new 
ConcurrentHashMap<>();
    +
    +
    +    public DeserializingConnectionCallback(final Map conf, final 
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
             _conf = conf;
             _context = context;
             _cb = callback;
    +        _sizeMetricsEnabled = 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
    +                              (Boolean) 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
    +
         }
     
         @Override
         public void recv(List<TaskMessage> batch) {
             KryoTupleDeserializer des = _des.get();
             ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
             for (TaskMessage message: batch) {
    -            ret.add(new AddressedTuple(message.task(), 
des.deserialize(message.message())));
    +            Tuple tuple = des.deserialize(message.message());
    +            AddressedTuple addrTuple = new AddressedTuple(message.task(), 
tuple);
    +            updateMetrics(tuple.getSourceTask(), message);
    +            ret.add(addrTuple);
             }
             _cb.transfer(ret);
         }
     
    +    @Override
    +    public Object getValueAndReset() {
    +        HashMap<String, Long> outMap = new HashMap<>();
    --- End diff --
    
    Can we return a null if metrics are not enabled instead of an empty map?  A 
null means the metric will not be sent to the metrics collector, but an empty 
map might mean that the metrics are disabled or that there was no activity at 
all.


---

Reply via email to