[ 
https://issues.apache.org/jira/browse/STORM-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17795472#comment-17795472
 ] 

Jawad Tahir commented on STORM-4000:
------------------------------------

Hi [~rzo1] ,

Thank you for your response and sorry for the late response, I have been away 
from my machine.  

I just checked it, and the new version does not solve the problem.

 

As far as I understood, the problem is not with serialization but with the 
wrong implementation of the late tuple management. The input in 
[WindowBoltExecutor|https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313]
 is already a Tuple. The tuple contains WorkerTopologyContext, which is not 
serializable (some volatile attributes). Hence, the error. In my opinion, we 
should change the line to 
{code:java}
windowedOutputCollector.emit(lateTupleStream, input, input.getValues()); {code}
instead of 
{code:java}
windowedOutputCollector.emit(lateTupleStream, input, new Values(input)); {code}
 

What are your thoughts on it?

 

> Processing late tuples from BaseWindowedBolt results in serialization 
> exception
> -------------------------------------------------------------------------------
>
>                 Key: STORM-4000
>                 URL: https://issues.apache.org/jira/browse/STORM-4000
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-client
>    Affects Versions: 2.5.0
>         Environment: Ubuntu 22.04, OpenJDK 11, Apache Storm 2.5.0
>            Reporter: Jawad Tahir
>            Priority: Major
>              Labels: latearrivingevents, serialization, windowing
>
> I am developing an Apache Storm (v2.5.0) topology that reads events from a 
> spout ({_}BaseRichSpout{_}), counts the number of events in tumbling windows 
> ({_}BaseWindowedBolt{_}), and prints the count ({_}BaseRichBolt{_}). The 
> topology works fine, but there are some out-of-order events in my dataset. 
> The _BaseWindowedBolt_ provides _withLateTupleStream_ method to route late 
> events to a separate stream. However, when I try to process late events, I 
> get a serialization exception:
> {code:java}
> Caused by: com.esotericsoftware.kryo.KryoException: 
> java.lang.IllegalArgumentException: Class is not registered: 
> org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
> Note: To register this class use: 
> kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
> Serialization trace:
> defaultResources (org.apache.storm.task.WorkerTopologyContext)
> context (org.apache.storm.tuple.TupleImpl)
>     at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) 
> ~[kryo-4.0.2.jar:?]
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
>  ~[kryo-4.0.2.jar:?]
>     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) 
> ~[kryo-4.0.2.jar:?]
>     at 
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) 
> ~[kryo-4.0.2.jar:?]
>     at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
>  ~[kryo-4.0.2.jar:?]
>     at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) 
> ~[kryo-4.0.2.jar:?]
>     at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
>  ~[kryo-4.0.2.jar:?]
>     at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
>  ~[kryo-4.0.2.jar:?]
>     at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) 
> ~[kryo-4.0.2.jar:?]
>     at 
> org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)
>  ~[storm-client-2.5.0.jar:2.5.0]
>     at 
> org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)
>  ~[storm-client-2.5.0.jar:2.5.0]
>     at 
> org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613)
>  ~[storm-client-2.5.0.jar:2.5.0]
>     at 
> org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101)
>  ~[storm-client-2.5.0.jar:2.5.0]
>     at 
> org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66)
>  ~[storm-client-2.5.0.jar:2.5.0]
>     at 
> org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) 
> ~[storm-client-2.5.0.jar:2.5.0]
>     at 
> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)
>  ~[storm-client-2.5.0.jar:2.5.0]
>     at 
> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
>  ~[storm-client-2.5.0.jar:2.5.0]
>     at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) 
> ~[storm-client-2.5.0.jar:2.5.0]
>     at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) 
> ~[storm-client-2.5.0.jar:2.5.0]
>     at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) 
> ~[storm-client-2.5.0.jar:2.5.0]
>     at 
> org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)
>  ~[storm-client-2.5.0.jar:2.5.0]
>     at 
> org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212)
>  ~[storm-client-2.5.0.jar:2.5.0]
>     at org.apache.storm.executor.Executor.accept(Executor.java:294) 
> ~[storm-client-2.5.0.jar:2.5.0]
>     ... 6 more{code}
> I have defined my topology as below:
>  
>  
> {code:java}
> public class TestTopology {
>     public static void main (String[] args) throws Exception {
>         Config config = new Config();
>         config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
>         config.registerSerialization(TupleImpl.class);
>         config.registerSerialization(WorkerTopologyContext.class);
>         config.registerSerialization(Fields.class);
>         LocalCluster cluster = new LocalCluster();
>         try (LocalCluster.LocalTopology topology = 
> cluster.submitTopology("testTopology", config, 
> getTopology().createTopology())) {
>             Thread.sleep(50000);}
>         cluster.shutdown();
>     }
>     static TopologyBuilder getTopology(){
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("eventSpout", new LateEventSpout());
>         builder.setBolt("windowBolt", new 
> WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
>                         withTimestampField("time").
>                         withLateTupleStream("lateEvents")).
>                         shuffleGrouping("eventSpout");
>         builder.setBolt("latePrintBolt", new LatePrintBolt()).
>                         shuffleGrouping("windowBolt", "lateEvents");
>         builder.setBolt("printBolt", new 
> PrintBolt()).shuffleGrouping("windowBolt");
>         return builder;
>     }
> }{code}
> Where `LateEventSpout` is
> {code:java}
> public class LateEventSpout extends BaseRichSpout {
>     private SpoutOutputCollector collector;
>     private List<Long> eventTimes;
>     private int currentTime = 0;
>     private int id = 1;
>     public LateEventSpout () {
>         eventTimes = new ArrayList<>();
>         for (int i = 1; i<= 61; i++) {
>             eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
>         } // eventTimes = [epoch+1, epoch+2, .., epoch+61]
>     }
>     @Override
>     public void open(Map<String, Object> conf, TopologyContext context, 
> SpoutOutputCollector collector) {
>         this.collector = collector;
>     }
>     @Override
>     public void nextTuple() {
>         int eventId = id++;
>         Long eventTime = eventTimes.get(currentTime++);
>         if (currentTime == eventTimes.size()){
>             currentTime = 0;
>         }
>         collector.emit(new Values(eventId, eventTime));
>     }
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("id", "time"));
>     }
> } {code}
> And `WindowBolt` is:
> {code:java}
> public class WindowBolt extends BaseWindowedBolt {
>     OutputCollector collector;
>     @Override
>     public void prepare(Map<String, Object> topoConf, TopologyContext 
> context, OutputCollector collector){
>         this.collector = collector;
>     }
>     @Override
>     public void execute(TupleWindow inputWindow) {
>         int sum = 0;
>         for (Tuple event : inputWindow.get()){
>             sum++;
>         }
>         collector.emit(new Values(inputWindow.getStartTimestamp(), 
> inputWindow.getEndTimestamp(), sum));
>     }
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("start", "end", "sum"));
>     }
> } {code}
>  
> And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is 
> similar)
> If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct 
> results
>  
> {code:java}
> public class PrintBolt extends BaseRichBolt {
>     @Override
>     public void prepare(Map<String, Object> topoConf, TopologyContext 
> context, OutputCollector collector) {
>     }
>     
>     @Override
>     public void execute(Tuple input) {
>         System.out.println(String.format("Start: %d, End: %d, Sum:%d", 
> input.getLongByField("start"), input.getLongByField("end"), 
> input.getIntegerByField("sum")));
>     }
>     
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     }
> } {code}
> {code:java}
> Start: 0, End: 10000, Sum:10
> Start: 10000, End: 20000, Sum:10
> Start: 20000, End: 30000, Sum:10
> Start: 30000, End: 40000, Sum:10
> Start: 40000, End: 50000, Sum:10
> Start: 50000, End: 60000, Sum:10 {code}
>  
> However, when I try to print lateEvents stream, I get the same output but on 
> the first late event, I get the above-mentioned exception.
>  
> I have debugged the issue. When 
> [WindowedBoltExecutor|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313]
>  receives a late tuple, it emits the late tuple but 
> [BoltOutputCollectorImpl|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110]
>   rewraps it in a new Tuple. Now, this new tuple contains 
> _WorkerTopologyContext,_ which is not serializable, hence the error.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to