Github user hummelm commented on the issue: https://github.com/apache/storm/pull/2591 After some digging I finally found a workaround. Using a simple trick in the start method of the worker-hook (rewind of the byteBuffer to allow the deserialization on stop)it can work as expected, here is a simple example if someone encounter the same issue : package myTest.stormWorkerHook; import java.nio.ByteBuffer; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.hooks.BaseWorkerHook; import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.builtin.Count; import org.apache.storm.trident.testing.FixedBatchSpout; import org.apache.storm.trident.testing.MemoryMapState; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; public class App { static class myHook extends BaseWorkerHook { // store the size of the serialization of this class int mySize = 0; public int getMySize() { return mySize; } public void setMySize(int mySize) { this.mySize = mySize; } /** * */ private static final long serialVersionUID = 1L; public void start(Map stormConf, WorkerTopologyContext context) { System.out.println("### START HOOK"); ByteBuffer bf = context.getRawTopology().get_worker_hooks().get(0); // getting the byte data for deserialization has move the position // to the end of the buffer int pos = bf.position(); // rewind the position of the buffer to allow // the deserialization which will occur on STOp bf.position(pos-mySize); } /** * This method is called right before a worker shuts down */ @Override public void shutdown() { System.out.println("### STOP HOOK"); } } /* * Function takes the "sentence" field and emits a tuple for each word. */ public static class Split extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for(String word: sentence.split(" ")) { collector.emit(new Values(word)); } } } // simple topo public static StormTopology buildTridentTopology() { /* * spout reads an infinite stream of sentences from the following source */ @SuppressWarnings("unchecked") FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat"), new Values("to be or not to be the person")); spout.setCycle(true); /* * TridentTopology object exposes the interface for constructing Trident computations */ TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .parallelismHint(16) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(16) ; StormTopology topo = topology.build(); myHook mh =new myHook(); // serialize one time to get the size of the byte[] result byte[] rawdata = Utils.javaSerialize(mh); // store the size inside the hook // it will be use to fix issue regarding byteBuffer mh.setMySize(rawdata.length ); // serialize with the the size as attibute rawdata = Utils.javaSerialize(mh); ByteBuffer a = ByteBuffer.wrap(rawdata); topo.add_to_worker_hooks(a); return topo; } public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setMaxSpoutPending(20); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, buildTridentTopology()); Thread.sleep(10000); cluster.killTopology("wordCounter"); Thread.sleep(10000); cluster.shutdown();
---