Github user hummelm commented on the issue:

    https://github.com/apache/storm/pull/2591
  
    After some digging I finally found a workaround.
    Using a simple trick in the start method of the worker-hook (rewind of
    the byteBuffer to allow the deserialization on stop)it can work as
    expected, here is a simple example if someone encounter the same issue
    :
    
    package myTest.stormWorkerHook;
    
    import java.nio.ByteBuffer;
    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.hooks.BaseWorkerHook;
    import org.apache.storm.task.WorkerTopologyContext;
    import org.apache.storm.trident.TridentState;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.operation.builtin.Count;
    import org.apache.storm.trident.testing.FixedBatchSpout;
    import org.apache.storm.trident.testing.MemoryMapState;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    
    public class App {
    
        static class myHook extends BaseWorkerHook {
            // store the size of the serialization of this class
            int mySize = 0;
    
            public int getMySize() {
                return mySize;
            }
    
            public void setMySize(int mySize) {
                this.mySize = mySize;
            }
    
            /**
             *
             */
            private static final long serialVersionUID = 1L;
    
             public void start(Map stormConf, WorkerTopologyContext context) {
                   System.out.println("### START HOOK");
                   ByteBuffer bf =
    context.getRawTopology().get_worker_hooks().get(0);
                   // getting the byte data for deserialization has move
    the position
                   // to the end of the buffer
                   int pos = bf.position();
                   // rewind the position of the buffer to allow
                   // the deserialization which will occur on STOp
                   bf.position(pos-mySize);
    
                }
    
                /**
                 * This method is called right before a worker shuts down
                 */
                @Override
                public void shutdown() {
                       System.out.println("### STOP HOOK");
                }
    
        }
    
    
    
        /*
         * Function takes the "sentence" field and emits a tuple for each word.
         */
        public static class Split extends BaseFunction {
            @Override
            public void execute(TridentTuple tuple, TridentCollector collector) 
{
                String sentence = tuple.getString(0);
                for(String word: sentence.split(" ")) {
                    collector.emit(new Values(word));
                }
            }
        }
    
        // simple topo
        public static StormTopology buildTridentTopology() {
            /*
             * spout reads an infinite stream of sentences from the following 
source
             */
            @SuppressWarnings("unchecked")
            FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 
3,
                    new Values("the cow jumped over the moon"),
                    new Values("the man went to the store and bought some 
candy"),
                    new Values("four score and seven years ago"),
                    new Values("how many apples can you eat"),
                    new Values("to be or not to be the person"));
            spout.setCycle(true);
    
            /*
             * TridentTopology object exposes the interface for
    constructing Trident computations
             */
            TridentTopology topology = new TridentTopology();
    
            TridentState wordCounts =
                    topology.newStream("spout1", spout)
                            .parallelismHint(16)
                            .each(new Fields("sentence"), new Split(), new
    Fields("word"))
                            .groupBy(new Fields("word"))
                            .persistentAggregate(new
    MemoryMapState.Factory(), new Count(), new Fields("count"))
                            .parallelismHint(16)
    
    
            ;
            StormTopology topo = topology.build();
            myHook mh =new myHook();
            // serialize one time to get the size of the byte[] result
            byte[] rawdata = Utils.javaSerialize(mh);
            // store the size inside the hook
            // it will be use to fix issue regarding byteBuffer
          mh.setMySize(rawdata.length );
          // serialize with the the size as attibute
          rawdata = Utils.javaSerialize(mh);
    
            ByteBuffer a = ByteBuffer.wrap(rawdata);
            topo.add_to_worker_hooks(a);
    
            return topo;
    
        }
    
    
    
    
        public static void main(String[] args) throws Exception {
    
            Config conf = new Config();
            conf.setMaxSpoutPending(20);
    
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("wordCounter", conf, 
buildTridentTopology());
                    Thread.sleep(10000);
                    cluster.killTopology("wordCounter");
                    Thread.sleep(10000);
                    cluster.shutdown();


---

Reply via email to