Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2399#discussion_r148328186
--- Diff:
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
---
@@ -17,44 +17,102 @@
*/
package org.apache.storm.messaging;
+import org.apache.storm.Config;
import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.serialization.KryoTupleDeserializer;
import org.apache.storm.task.GeneralTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
-import org.apache.storm.serialization.KryoTupleDeserializer;
+import org.apache.storm.tuple.Tuple;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* A class that is called when a TaskMessage arrives.
*/
-public class DeserializingConnectionCallback implements
IConnectionCallback {
+public class DeserializingConnectionCallback implements
IConnectionCallback, IMetric {
private final WorkerState.ILocalTransferCallback _cb;
private final Map _conf;
private final GeneralTopologyContext _context;
+
private final ThreadLocal<KryoTupleDeserializer> _des =
- new ThreadLocal<KryoTupleDeserializer>() {
- @Override
- protected KryoTupleDeserializer initialValue() {
- return new KryoTupleDeserializer(_conf, _context);
- }
- };
-
- public DeserializingConnectionCallback(final Map<String, Object> conf,
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback
callback) {
+ new ThreadLocal<KryoTupleDeserializer>() {
+ @Override
+ protected KryoTupleDeserializer initialValue() {
+ return new KryoTupleDeserializer(_conf, _context);
+ }
+ };
+
+ // Track serialized size of messages
+ private final boolean _sizeMetricsEnabled;
+ private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new
ConcurrentHashMap<>();
+
+
+ public DeserializingConnectionCallback(final Map conf, final
GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
_conf = conf;
_context = context;
_cb = callback;
+ _sizeMetricsEnabled =
_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
--- End diff --
It might be simpler to use.
```
sizeMetricsEnabled =
ObjectReader.getBoolean(_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS),
false);
```
---