Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148328916
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements 
IConnectionCallback {
    +public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new 
ConcurrentHashMap<>();
    +
    +
    +    public DeserializingConnectionCallback(final Map conf, final 
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
             _conf = conf;
             _context = context;
             _cb = callback;
    +        _sizeMetricsEnabled = 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
    +                              (Boolean) 
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
    +
         }
     
         @Override
         public void recv(List<TaskMessage> batch) {
             KryoTupleDeserializer des = _des.get();
             ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
             for (TaskMessage message: batch) {
    -            ret.add(new AddressedTuple(message.task(), 
des.deserialize(message.message())));
    +            Tuple tuple = des.deserialize(message.message());
    +            AddressedTuple addrTuple = new AddressedTuple(message.task(), 
tuple);
    +            updateMetrics(tuple.getSourceTask(), message);
    +            ret.add(addrTuple);
             }
             _cb.transfer(ret);
         }
     
    +    @Override
    +    public Object getValueAndReset() {
    +        HashMap<String, Long> outMap = new HashMap<>();
    +
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    +            for (Map.Entry<String, AtomicLong> ent : 
_byteCounts.entrySet()) {
    +                AtomicLong count = ent.getValue();
    +                if (count.get() > 0) {
    +                    outMap.put(ent.getKey(), count.getAndSet(0L));
    +                }
    +            }
    +        }
    +        return outMap;
    +    }
    +
    +    /**
    +     * Update serialized byte counts for each message
    +     * @param sourceTaskId source task
    +     * @param message serialized message
    +     */
    +    protected void updateMetrics(int sourceTaskId, TaskMessage message) {
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    --- End diff --
    
    Here too, the comment is a bit confusing.


---

Reply via email to