[ 
https://issues.apache.org/jira/browse/STORM-2979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16394521#comment-16394521
 ] 

michel hummel commented on STORM-2979:
--------------------------------------

The issue seems to be related with the use if a ByteBuffer to store the 
serialized workerhook.
The byteBuffer is deserialized two time:
 * On start
 * On Stop

On worker start, the worker retrieves the byte array of the ByteBuffer and the 
position on the ByteBuffer is then at the end of the ByteBuffer.
On worker stop, the worker retrieves the byte array of the ByteBuffer but its 
position is at the end of the bytebuffer.


There multiple way to fix this issue:


 *  in worker.clj run-worker-start-hooks, reseting the Bytebuffer position 
after the workerhook start.

This is the purpose of my commit: 
[https://github.com/hummelm/storm/commit/7afeff6d8db4a78250ff8827207e80247b0acd25]

 
 * Add a method to the Utils class to allow the retreiving of a byteArray from 
byteBuffer without changing the internal position of the byteBuffer and use it 
in the worker.clj

May be a 'storm' developer can give us an advice on the better way to fix the 
issue, it is blocking our development and we will be happy the create a 
pullRequest.

 

One thing is strange, the ByteBuffer seems to contain more data than expected, 
I mean the serialized workerhook is about 106 Bytes and the buffer position is 
more than 2800.

whatever it is the purposed fix is working.

> WorkerHooks EOFException during run_worker_shutdown_hooks
> ---------------------------------------------------------
>
>                 Key: STORM-2979
>                 URL: https://issues.apache.org/jira/browse/STORM-2979
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>    Affects Versions: 1.0.6
>         Environment: centos 7
> storm-core 1.0.6
> eclipse Mars2
> java 1.8.0_151
>            Reporter: Robin Perice
>            Priority: Major
>
> Hi,
> I'm trying to use the BaseWorkerHook but an exception is thrown after I 
> killed the topology.
> The issue is exactly the same as : 
> [http://user.storm.apache.narkive.com/uchOrwlH/workerhook-deserialization-problem|http://user.storm.apache.narkive.com/uchOrwlH/workerhook-deserialization-problem]
> An extract of my code :
>  
> {code:java}
> // topology
> final TridentTopology topology = new TridentTopology();
> // ... I skip all the topology configuration part
> final StormTopology topo = topology.build();
> // hook
> final BaseWorkerHook hook = new BaseWorkerHook();
> final ByteBuffer serializedHook = ByteBuffer.wrap(Utils.javaSerialize(hook ));
> topo.add_to_worker_hooks(hook);
> // submit topology
> LocalCluster cluster = new LocalCluster();
> cluster.submitTopology(name,config,topo);
> Utils.sleep(60000);
> // kill topology
> final KillOptions killOptions = new KillOptions();
> killOptions.set_wait_secs(0);
> cluster.killTopologyWithOpts(name, killOptions);
> Utils.sleep(10000);
> cluster.shutdown();
> {code}
>  
> I have the following error :
> {code:java}
> java.lang.RuntimeException: java.io.EOFException
>     at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:254)
>     at 
> org.apache.storm.daemon.worker$run_worker_shutdown_hooks$iter__5456__5460$fn__5461.invoke(worker.clj:578)
>     at clojure.lang.LazySeq.sval(LazySeq.java:40)
>     at clojure.lang.LazySeq.seq(LazySeq.java:49)
>     at clojure.lang.RT.seq(RT.java:507)
>     at clojure.core$seq__4128.invoke(core.clj:137)
>     at clojure.core$dorun.invoke(core.clj:3009)
>     at clojure.core$doall.invoke(core.clj:3025)
>     at 
> org.apache.storm.daemon.worker$run_worker_shutdown_hooks.invoke(worker.clj:576)
>     at 
> org.apache.storm.daemon.worker$fn__5471$exec_fn__1371__auto__$reify__5473$shutdown_STAR___5493.invoke(worker.clj:693)
>     at 
> org.apache.storm.daemon.worker$fn__5471$exec_fn__1371__auto__$reify$reify__5519.shutdown(worker.clj:706)
>     at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:67)
>     at 
> org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:59)
>     at 
> org.apache.storm.daemon.supervisor.Slot.killContainerForChangedAssignment(Slot.java:311)
>     at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:527)
>     at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:265)
>     at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:741)
> Caused by: java.io.EOFException
>     at 
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2680)
>     at 
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3155)
>     at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:864)
>     at java.io.ObjectInputStream.<init>(ObjectInputStream.java:360)
>     at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:245)
>     ... 16 more
> {code}
>  
> Maybe it is related to log4j shutdown hooks 
> (https://issues.apache.org/jira/browse/STORM-2176) so I tried to disable the 
> hook in my src/test/resources/log4j2.xml.
>  
> {code:java}
> <Configuration monitorInterval="60" shutdownHook="disable">
>     <Appenders>
>         <Console name="Console" target="SYSTEM_OUT">
>             <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level 
> %logger{36} - %msg%n" />
>         </Console>
>     </Appenders>
>     <Loggers>
>         <Root level="debug">
>             <AppenderRef ref="Console" />
>         </Root>
>     </Loggers>
> </Configuration>
> {code}
> But it does not change anything.
>  
> Of course the purpose of my work is to use my own worker hook extending the 
> BaseWorkerHook.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to