[ https://issues.apache.org/jira/browse/STORM-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jawad Tahir updated STORM-4000: ------------------------------- Description: I am developing an Apache Storm (v2.5.0) topology that reads events from a spout ({_}BaseRichSpout{_}), counts the number of events in tumbling windows ({_}BaseWindowedBolt{_}), and prints the count ({_}BaseRichBolt{_}). The topology works fine, but there are some out-of-order events in my dataset. The _BaseWindowedBolt_ provides _withLateTupleStream_ method to route late events to a separate stream. However, when I try to process late events, I get a serialization exception: {code:java} Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap Note: To register this class use: kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class); Serialization trace: defaultResources (org.apache.storm.task.WorkerTopologyContext) context (org.apache.storm.tuple.TupleImpl) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?] at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0] ... 6 more{code} I have defined my topology as below: {code:java} public class TestTopology { public static void main (String[] args) throws Exception { Config config = new Config(); config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true); config.registerSerialization(TupleImpl.class); config.registerSerialization(WorkerTopologyContext.class); config.registerSerialization(Fields.class); LocalCluster cluster = new LocalCluster(); try (LocalCluster.LocalTopology topology = cluster.submitTopology("testTopology", config, getTopology().createTopology())) { Thread.sleep(50000);} cluster.shutdown(); } static TopologyBuilder getTopology(){ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("eventSpout", new LateEventSpout()); builder.setBolt("windowBolt", new WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)). withTimestampField("time"). withLateTupleStream("lateEvents")). shuffleGrouping("eventSpout"); builder.setBolt("latePrintBolt", new LatePrintBolt()). shuffleGrouping("windowBolt", "lateEvents"); builder.setBolt("printBolt", new PrintBolt()).shuffleGrouping("windowBolt"); return builder; } }{code} Where `LateEventSpout` is {code:java} public class LateEventSpout extends BaseRichSpout { private SpoutOutputCollector collector; private List<Long> eventTimes; private int currentTime = 0; private int id = 1; public LateEventSpout () { eventTimes = new ArrayList<>(); for (int i = 1; i<= 61; i++) { eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli()); } // eventTimes = [epoch+1, epoch+2, .., epoch+61] } @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { int eventId = id++; Long eventTime = eventTimes.get(currentTime++); if (currentTime == eventTimes.size()){ currentTime = 0; } collector.emit(new Values(eventId, eventTime)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "time")); } } {code} And `WindowBolt` is: {code:java} public class WindowBolt extends BaseWindowedBolt { OutputCollector collector; @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector){ this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { int sum = 0; for (Tuple event : inputWindow.get()){ sum++; } collector.emit(new Values(inputWindow.getStartTimestamp(), inputWindow.getEndTimestamp(), sum)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("start", "end", "sum")); } } {code} And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is similar) If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct results {code:java} public class PrintBolt extends BaseRichBolt { @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { System.out.println(String.format("Start: %d, End: %d, Sum:%d", input.getLongByField("start"), input.getLongByField("end"), input.getIntegerByField("sum"))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } {code} {code:java} Start: 0, End: 10000, Sum:10 Start: 10000, End: 20000, Sum:10 Start: 20000, End: 30000, Sum:10 Start: 30000, End: 40000, Sum:10 Start: 40000, End: 50000, Sum:10 Start: 50000, End: 60000, Sum:10 {code} However, when I try to print lateEvents stream, I get the same output but on the first late event, I get the above-mentioned exception. I have debugged the issue. When [WindowedBoltExecutor|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313] receives a late tuple, it emits the late tuple but [BoltOutputCollectorImpl|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110] rewraps it in a new Tuple. Now, this new tuple contains _WorkerTopologyContext,_ which is not serializable, hence the error. was: I am developing an Apache Storm (v2.5.0) topology that reads events from a spout ({_}BaseRichSpout{_}), counts the number of events in tumbling windows ({_}BaseWindowedBolt{_}), and prints the count ({_}BaseRichBolt{_}). The topology works fine, but there are some out-of-order events in my dataset. The _BaseWindowedBolt_ provides _withLateTupleStream_ method to route late events to a separate stream. However, when I try to process late events, I get a serialization exception: {code:java} Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap Note: To register this class use: kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class); Serialization trace: defaultResources (org.apache.storm.task.WorkerTopologyContext) context (org.apache.storm.tuple.TupleImpl) at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?] at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?] at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0] at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0] ... 6 more{code} I have defined my topology as below: {code:java} public class TestTopology { public static void main (String[] args) throws Exception { Config config = new Config(); config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true); config.registerSerialization(TupleImpl.class); config.registerSerialization(WorkerTopologyContext.class); config.registerSerialization(Fields.class); LocalCluster cluster = new LocalCluster(); try (LocalCluster.LocalTopology topology = cluster.submitTopology("testTopology", config, getTopology().createTopology())) { Thread.sleep(50000);} cluster.shutdown(); } static TopologyBuilder getTopology(){ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("eventSpout", new LateEventSpout()); builder.setBolt("windowBolt", new WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)). withTimestampField("time"). withLateTupleStream("lateEvents")). shuffleGrouping("eventSpout"); builder.setBolt("latePrintBolt", new LatePrintBolt()). shuffleGrouping("windowBolt", "lateEvents"); builder.setBolt("printBolt", new PrintBolt()).shuffleGrouping("windowBolt"); return builder; } }{code} Where `LateEventSpout` is {code:java} public class LateEventSpout extends BaseRichSpout { private SpoutOutputCollector collector; private List<Long> eventTimes; private int currentTime = 0; private int id = 1; public LateEventSpout () { eventTimes = new ArrayList<>(); for (int i = 1; i<= 61; i++) { eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli()); } // eventTimes = [epoch+1, epoch+2, .., epoch+61] } @Override public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { int eventId = id++; Long eventTime = eventTimes.get(currentTime++); if (currentTime == eventTimes.size()){ currentTime = 0; } collector.emit(new Values(eventId, eventTime)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "time")); } } {code} And `WindowBolt` is: {code:java} public class WindowBolt extends BaseWindowedBolt { OutputCollector collector; @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector){ this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { int sum = 0; for (Tuple event : inputWindow.get()){ sum++; } collector.emit(new Values(inputWindow.getStartTimestamp(), inputWindow.getEndTimestamp(), sum)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("start", "end", "sum")); } } {code} And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is similar) If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct results {code:java} public class PrintBolt extends BaseRichBolt { @Override public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { System.out.println(String.format("Start: %d, End: %d, Sum:%d", input.getLongByField("start"), input.getLongByField("end"), input.getIntegerByField("sum"))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } {code} {code:java} Start: 0, End: 10000, Sum:10 Start: 10000, End: 20000, Sum:10 Start: 20000, End: 30000, Sum:10 Start: 30000, End: 40000, Sum:10 Start: 40000, End: 50000, Sum:10 Start: 50000, End: 60000, Sum:10 {code} However, when I try to print lateEvents stream, I get the same output but on the first late event, I get the above-mentioned exception. I have debugged the issue. When [WindowedBoltExecutor|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313] receives a late tuple, it emits the late tuple but [BoltOutputCollectorImpl|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110] rewraps it in a new Tuple. Now, this new tuple contains _WorkerTopologyContext,_ which is not serializable, hence the error. I would like to know how I can process the late tuples. > Processing late tuples from BaseWindowedBolt results in serialization > exception > ------------------------------------------------------------------------------- > > Key: STORM-4000 > URL: https://issues.apache.org/jira/browse/STORM-4000 > Project: Apache Storm > Issue Type: Bug > Components: storm-client > Affects Versions: 2.5.0 > Environment: Ubuntu 22.04, OpenJDK 11, Apache Storm 2.5.0 > Reporter: Jawad Tahir > Priority: Major > Labels: latearrivingevents, serialization, windowing > > I am developing an Apache Storm (v2.5.0) topology that reads events from a > spout ({_}BaseRichSpout{_}), counts the number of events in tumbling windows > ({_}BaseWindowedBolt{_}), and prints the count ({_}BaseRichBolt{_}). The > topology works fine, but there are some out-of-order events in my dataset. > The _BaseWindowedBolt_ provides _withLateTupleStream_ method to route late > events to a separate stream. However, when I try to process late events, I > get a serialization exception: > {code:java} > Caused by: com.esotericsoftware.kryo.KryoException: > java.lang.IllegalArgumentException: Class is not registered: > org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap > Note: To register this class use: > kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class); > Serialization trace: > defaultResources (org.apache.storm.task.WorkerTopologyContext) > context (org.apache.storm.tuple.TupleImpl) > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) > ~[kryo-4.0.2.jar:?] > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) > ~[kryo-4.0.2.jar:?] > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) > ~[kryo-4.0.2.jar:?] > at > com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) > ~[kryo-4.0.2.jar:?] > at > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) > ~[kryo-4.0.2.jar:?] > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) > ~[kryo-4.0.2.jar:?] > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) > ~[kryo-4.0.2.jar:?] > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) > ~[kryo-4.0.2.jar:?] > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) > ~[kryo-4.0.2.jar:?] > at > org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) > ~[storm-client-2.5.0.jar:2.5.0] > at > org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) > ~[storm-client-2.5.0.jar:2.5.0] > at > org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613) > ~[storm-client-2.5.0.jar:2.5.0] > at > org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101) > ~[storm-client-2.5.0.jar:2.5.0] > at > org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66) > ~[storm-client-2.5.0.jar:2.5.0] > at > org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36) > ~[storm-client-2.5.0.jar:2.5.0] > at > org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) > ~[storm-client-2.5.0.jar:2.5.0] > at > org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) > ~[storm-client-2.5.0.jar:2.5.0] > at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) > ~[storm-client-2.5.0.jar:2.5.0] > at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) > ~[storm-client-2.5.0.jar:2.5.0] > at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) > ~[storm-client-2.5.0.jar:2.5.0] > at > org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313) > ~[storm-client-2.5.0.jar:2.5.0] > at > org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) > ~[storm-client-2.5.0.jar:2.5.0] > at org.apache.storm.executor.Executor.accept(Executor.java:294) > ~[storm-client-2.5.0.jar:2.5.0] > ... 6 more{code} > I have defined my topology as below: > > > {code:java} > public class TestTopology { > public static void main (String[] args) throws Exception { > Config config = new Config(); > config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true); > config.registerSerialization(TupleImpl.class); > config.registerSerialization(WorkerTopologyContext.class); > config.registerSerialization(Fields.class); > LocalCluster cluster = new LocalCluster(); > try (LocalCluster.LocalTopology topology = > cluster.submitTopology("testTopology", config, > getTopology().createTopology())) { > Thread.sleep(50000);} > cluster.shutdown(); > } > static TopologyBuilder getTopology(){ > TopologyBuilder builder = new TopologyBuilder(); > builder.setSpout("eventSpout", new LateEventSpout()); > builder.setBolt("windowBolt", new > WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)). > withTimestampField("time"). > withLateTupleStream("lateEvents")). > shuffleGrouping("eventSpout"); > builder.setBolt("latePrintBolt", new LatePrintBolt()). > shuffleGrouping("windowBolt", "lateEvents"); > builder.setBolt("printBolt", new > PrintBolt()).shuffleGrouping("windowBolt"); > return builder; > } > }{code} > Where `LateEventSpout` is > {code:java} > public class LateEventSpout extends BaseRichSpout { > private SpoutOutputCollector collector; > private List<Long> eventTimes; > private int currentTime = 0; > private int id = 1; > public LateEventSpout () { > eventTimes = new ArrayList<>(); > for (int i = 1; i<= 61; i++) { > eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli()); > } // eventTimes = [epoch+1, epoch+2, .., epoch+61] > } > @Override > public void open(Map<String, Object> conf, TopologyContext context, > SpoutOutputCollector collector) { > this.collector = collector; > } > @Override > public void nextTuple() { > int eventId = id++; > Long eventTime = eventTimes.get(currentTime++); > if (currentTime == eventTimes.size()){ > currentTime = 0; > } > collector.emit(new Values(eventId, eventTime)); > } > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("id", "time")); > } > } {code} > And `WindowBolt` is: > {code:java} > public class WindowBolt extends BaseWindowedBolt { > OutputCollector collector; > @Override > public void prepare(Map<String, Object> topoConf, TopologyContext > context, OutputCollector collector){ > this.collector = collector; > } > @Override > public void execute(TupleWindow inputWindow) { > int sum = 0; > for (Tuple event : inputWindow.get()){ > sum++; > } > collector.emit(new Values(inputWindow.getStartTimestamp(), > inputWindow.getEndTimestamp(), sum)); > } > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > declarer.declare(new Fields("start", "end", "sum")); > } > } {code} > > And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is > similar) > If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct > results > > {code:java} > public class PrintBolt extends BaseRichBolt { > @Override > public void prepare(Map<String, Object> topoConf, TopologyContext > context, OutputCollector collector) { > } > > @Override > public void execute(Tuple input) { > System.out.println(String.format("Start: %d, End: %d, Sum:%d", > input.getLongByField("start"), input.getLongByField("end"), > input.getIntegerByField("sum"))); > } > > @Override > public void declareOutputFields(OutputFieldsDeclarer declarer) { > } > } {code} > {code:java} > Start: 0, End: 10000, Sum:10 > Start: 10000, End: 20000, Sum:10 > Start: 20000, End: 30000, Sum:10 > Start: 30000, End: 40000, Sum:10 > Start: 40000, End: 50000, Sum:10 > Start: 50000, End: 60000, Sum:10 {code} > > However, when I try to print lateEvents stream, I get the same output but on > the first late event, I get the above-mentioned exception. > > I have debugged the issue. When > [WindowedBoltExecutor|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313] > receives a late tuple, it emits the late tuple but > [BoltOutputCollectorImpl|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110] > rewraps it in a new Tuple. Now, this new tuple contains > _WorkerTopologyContext,_ which is not serializable, hence the error. > -- This message was sent by Atlassian Jira (v8.20.10#820010)