[
https://issues.apache.org/jira/browse/STORM-4000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jawad Tahir updated STORM-4000:
-------------------------------
Description:
I am developing an Apache Storm (v2.5.0) topology that reads events from a
spout ({_}BaseRichSpout{_}), counts the number of events in tumbling windows
({_}BaseWindowedBolt{_}), and prints the count ({_}BaseRichBolt{_}). The
topology works fine, but there are some out-of-order events in my dataset. The
_BaseWindowedBolt_ provides _withLateTupleStream_ method to route late events
to a separate stream. However, when I try to process late events, I get a
serialization exception:
{code:java}
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Class is not registered:
org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
Note: To register this class use:
kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
Serialization trace:
defaultResources (org.apache.storm.task.WorkerTopologyContext)
context (org.apache.storm.tuple.TupleImpl)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557)
~[kryo-4.0.2.jar:?]
at
org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212)
~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.executor.Executor.accept(Executor.java:294)
~[storm-client-2.5.0.jar:2.5.0]
... 6 more{code}
I have defined my topology as below:
{code:java}
public class TestTopology {
public static void main (String[] args) throws Exception {
Config config = new Config();
config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
config.registerSerialization(TupleImpl.class);
config.registerSerialization(WorkerTopologyContext.class);
config.registerSerialization(Fields.class);
LocalCluster cluster = new LocalCluster();
try (LocalCluster.LocalTopology topology =
cluster.submitTopology("testTopology", config, getTopology().createTopology()))
{
Thread.sleep(50000);}
cluster.shutdown();
}
static TopologyBuilder getTopology(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("eventSpout", new LateEventSpout());
builder.setBolt("windowBolt", new
WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
withTimestampField("time").
withLateTupleStream("lateEvents")).
shuffleGrouping("eventSpout");
builder.setBolt("latePrintBolt", new LatePrintBolt()).
shuffleGrouping("windowBolt", "lateEvents");
builder.setBolt("printBolt", new
PrintBolt()).shuffleGrouping("windowBolt");
return builder;
}
}{code}
Where `LateEventSpout` is
{code:java}
public class LateEventSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private List<Long> eventTimes;
private int currentTime = 0;
private int id = 1;
public LateEventSpout () {
eventTimes = new ArrayList<>();
for (int i = 1; i<= 61; i++) {
eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
} // eventTimes = [epoch+1, epoch+2, .., epoch+61]
}
@Override
public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
int eventId = id++;
Long eventTime = eventTimes.get(currentTime++);
if (currentTime == eventTimes.size()){
currentTime = 0;
}
collector.emit(new Values(eventId, eventTime));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "time"));
}
} {code}
And `WindowBolt` is:
{code:java}
public class WindowBolt extends BaseWindowedBolt {
OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector){
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
int sum = 0;
for (Tuple event : inputWindow.get()){
sum++;
}
collector.emit(new Values(inputWindow.getStartTimestamp(),
inputWindow.getEndTimestamp(), sum));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("start", "end", "sum"));
}
} {code}
And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is
similar)
If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct
results
{code:java}
public class PrintBolt extends BaseRichBolt {
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
System.out.println(String.format("Start: %d, End: %d, Sum:%d",
input.getLongByField("start"), input.getLongByField("end"),
input.getIntegerByField("sum")));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
} {code}
{code:java}
Start: 0, End: 10000, Sum:10
Start: 10000, End: 20000, Sum:10
Start: 20000, End: 30000, Sum:10
Start: 30000, End: 40000, Sum:10
Start: 40000, End: 50000, Sum:10
Start: 50000, End: 60000, Sum:10 {code}
However, when I try to print lateEvents stream, I get the same output but on
the first late event, I get the above-mentioned exception.
I have debugged the issue. When
[WindowedBoltExecutor|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313]
receives a late tuple, it emits the late tuple but
[BoltOutputCollectorImpl|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110]
rewraps it in a new Tuple. Now, this new tuple contains
_WorkerTopologyContext,_ which is not serializable, hence the error.
was:
I am developing an Apache Storm (v2.5.0) topology that reads events from a
spout ({_}BaseRichSpout{_}), counts the number of events in tumbling windows
({_}BaseWindowedBolt{_}), and prints the count ({_}BaseRichBolt{_}). The
topology works fine, but there are some out-of-order events in my dataset. The
_BaseWindowedBolt_ provides _withLateTupleStream_ method to route late events
to a separate stream. However, when I try to process late events, I get a
serialization exception:
{code:java}
Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Class is not registered:
org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
Note: To register this class use:
kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
Serialization trace:
defaultResources (org.apache.storm.task.WorkerTopologyContext)
context (org.apache.storm.tuple.TupleImpl)
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
~[kryo-4.0.2.jar:?]
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
~[kryo-4.0.2.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557)
~[kryo-4.0.2.jar:?]
at
org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)
~[storm-client-2.5.0.jar:2.5.0]
at
org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212)
~[storm-client-2.5.0.jar:2.5.0]
at org.apache.storm.executor.Executor.accept(Executor.java:294)
~[storm-client-2.5.0.jar:2.5.0]
... 6 more{code}
I have defined my topology as below:
{code:java}
public class TestTopology {
public static void main (String[] args) throws Exception {
Config config = new Config();
config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
config.registerSerialization(TupleImpl.class);
config.registerSerialization(WorkerTopologyContext.class);
config.registerSerialization(Fields.class);
LocalCluster cluster = new LocalCluster();
try (LocalCluster.LocalTopology topology =
cluster.submitTopology("testTopology", config, getTopology().createTopology()))
{
Thread.sleep(50000);}
cluster.shutdown();
}
static TopologyBuilder getTopology(){
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("eventSpout", new LateEventSpout());
builder.setBolt("windowBolt", new
WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
withTimestampField("time").
withLateTupleStream("lateEvents")).
shuffleGrouping("eventSpout");
builder.setBolt("latePrintBolt", new LatePrintBolt()).
shuffleGrouping("windowBolt", "lateEvents");
builder.setBolt("printBolt", new
PrintBolt()).shuffleGrouping("windowBolt");
return builder;
}
}{code}
Where `LateEventSpout` is
{code:java}
public class LateEventSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private List<Long> eventTimes;
private int currentTime = 0;
private int id = 1;
public LateEventSpout () {
eventTimes = new ArrayList<>();
for (int i = 1; i<= 61; i++) {
eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
} // eventTimes = [epoch+1, epoch+2, .., epoch+61]
}
@Override
public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
int eventId = id++;
Long eventTime = eventTimes.get(currentTime++);
if (currentTime == eventTimes.size()){
currentTime = 0;
}
collector.emit(new Values(eventId, eventTime));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "time"));
}
} {code}
And `WindowBolt` is:
{code:java}
public class WindowBolt extends BaseWindowedBolt {
OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector){
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
int sum = 0;
for (Tuple event : inputWindow.get()){
sum++;
}
collector.emit(new Values(inputWindow.getStartTimestamp(),
inputWindow.getEndTimestamp(), sum));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("start", "end", "sum"));
}
} {code}
And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is
similar)
If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct
results
{code:java}
public class PrintBolt extends BaseRichBolt {
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
System.out.println(String.format("Start: %d, End: %d, Sum:%d",
input.getLongByField("start"), input.getLongByField("end"),
input.getIntegerByField("sum")));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
} {code}
{code:java}
Start: 0, End: 10000, Sum:10
Start: 10000, End: 20000, Sum:10
Start: 20000, End: 30000, Sum:10
Start: 30000, End: 40000, Sum:10
Start: 40000, End: 50000, Sum:10
Start: 50000, End: 60000, Sum:10 {code}
However, when I try to print lateEvents stream, I get the same output but on
the first late event, I get the above-mentioned exception.
I have debugged the issue. When
[WindowedBoltExecutor|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313]
receives a late tuple, it emits the late tuple but
[BoltOutputCollectorImpl|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110]
rewraps it in a new Tuple. Now, this new tuple contains
_WorkerTopologyContext,_ which is not serializable, hence the error.
I would like to know how I can process the late tuples.
> Processing late tuples from BaseWindowedBolt results in serialization
> exception
> -------------------------------------------------------------------------------
>
> Key: STORM-4000
> URL: https://issues.apache.org/jira/browse/STORM-4000
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-client
> Affects Versions: 2.5.0
> Environment: Ubuntu 22.04, OpenJDK 11, Apache Storm 2.5.0
> Reporter: Jawad Tahir
> Priority: Major
> Labels: latearrivingevents, serialization, windowing
>
> I am developing an Apache Storm (v2.5.0) topology that reads events from a
> spout ({_}BaseRichSpout{_}), counts the number of events in tumbling windows
> ({_}BaseWindowedBolt{_}), and prints the count ({_}BaseRichBolt{_}). The
> topology works fine, but there are some out-of-order events in my dataset.
> The _BaseWindowedBolt_ provides _withLateTupleStream_ method to route late
> events to a separate stream. However, when I try to process late events, I
> get a serialization exception:
> {code:java}
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.IllegalArgumentException: Class is not registered:
> org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap
> Note: To register this class use:
> kryo.register(org.apache.storm.shade.com.google.common.collect.SingletonImmutableBiMap.class);
> Serialization trace:
> defaultResources (org.apache.storm.task.WorkerTopologyContext)
> context (org.apache.storm.tuple.TupleImpl)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
> ~[kryo-4.0.2.jar:?]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
> ~[kryo-4.0.2.jar:?]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
> ~[kryo-4.0.2.jar:?]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
> ~[kryo-4.0.2.jar:?]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
> ~[kryo-4.0.2.jar:?]
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
> ~[kryo-4.0.2.jar:?]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
> ~[kryo-4.0.2.jar:?]
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
> ~[kryo-4.0.2.jar:?]
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557)
> ~[kryo-4.0.2.jar:?]
> at
> org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38)
> ~[storm-client-2.5.0.jar:2.5.0]
> at
> org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40)
> ~[storm-client-2.5.0.jar:2.5.0]
> at
> org.apache.storm.daemon.worker.WorkerState.checkSerialize(WorkerState.java:613)
> ~[storm-client-2.5.0.jar:2.5.0]
> at
> org.apache.storm.executor.ExecutorTransfer.tryTransferLocal(ExecutorTransfer.java:101)
> ~[storm-client-2.5.0.jar:2.5.0]
> at
> org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:66)
> ~[storm-client-2.5.0.jar:2.5.0]
> at
> org.apache.storm.executor.LocalExecutor$1.tryTransfer(LocalExecutor.java:36)
> ~[storm-client-2.5.0.jar:2.5.0]
> at
> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112)
> ~[storm-client-2.5.0.jar:2.5.0]
> at
> org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65)
> ~[storm-client-2.5.0.jar:2.5.0]
> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
> ~[storm-client-2.5.0.jar:2.5.0]
> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93)
> ~[storm-client-2.5.0.jar:2.5.0]
> at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42)
> ~[storm-client-2.5.0.jar:2.5.0]
> at
> org.apache.storm.topology.WindowedBoltExecutor.execute(WindowedBoltExecutor.java:313)
> ~[storm-client-2.5.0.jar:2.5.0]
> at
> org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212)
> ~[storm-client-2.5.0.jar:2.5.0]
> at org.apache.storm.executor.Executor.accept(Executor.java:294)
> ~[storm-client-2.5.0.jar:2.5.0]
> ... 6 more{code}
> I have defined my topology as below:
>
>
> {code:java}
> public class TestTopology {
> public static void main (String[] args) throws Exception {
> Config config = new Config();
> config.put(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE, true);
> config.registerSerialization(TupleImpl.class);
> config.registerSerialization(WorkerTopologyContext.class);
> config.registerSerialization(Fields.class);
> LocalCluster cluster = new LocalCluster();
> try (LocalCluster.LocalTopology topology =
> cluster.submitTopology("testTopology", config,
> getTopology().createTopology())) {
> Thread.sleep(50000);}
> cluster.shutdown();
> }
> static TopologyBuilder getTopology(){
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("eventSpout", new LateEventSpout());
> builder.setBolt("windowBolt", new
> WindowBolt().withTumblingWindow(BaseWindowedBolt.Duration.seconds(10)).
> withTimestampField("time").
> withLateTupleStream("lateEvents")).
> shuffleGrouping("eventSpout");
> builder.setBolt("latePrintBolt", new LatePrintBolt()).
> shuffleGrouping("windowBolt", "lateEvents");
> builder.setBolt("printBolt", new
> PrintBolt()).shuffleGrouping("windowBolt");
> return builder;
> }
> }{code}
> Where `LateEventSpout` is
> {code:java}
> public class LateEventSpout extends BaseRichSpout {
> private SpoutOutputCollector collector;
> private List<Long> eventTimes;
> private int currentTime = 0;
> private int id = 1;
> public LateEventSpout () {
> eventTimes = new ArrayList<>();
> for (int i = 1; i<= 61; i++) {
> eventTimes.add(Instant.EPOCH.plusSeconds(i).toEpochMilli());
> } // eventTimes = [epoch+1, epoch+2, .., epoch+61]
> }
> @Override
> public void open(Map<String, Object> conf, TopologyContext context,
> SpoutOutputCollector collector) {
> this.collector = collector;
> }
> @Override
> public void nextTuple() {
> int eventId = id++;
> Long eventTime = eventTimes.get(currentTime++);
> if (currentTime == eventTimes.size()){
> currentTime = 0;
> }
> collector.emit(new Values(eventId, eventTime));
> }
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("id", "time"));
> }
> } {code}
> And `WindowBolt` is:
> {code:java}
> public class WindowBolt extends BaseWindowedBolt {
> OutputCollector collector;
> @Override
> public void prepare(Map<String, Object> topoConf, TopologyContext
> context, OutputCollector collector){
> this.collector = collector;
> }
> @Override
> public void execute(TupleWindow inputWindow) {
> int sum = 0;
> for (Tuple event : inputWindow.get()){
> sum++;
> }
> collector.emit(new Values(inputWindow.getStartTimestamp(),
> inputWindow.getEndTimestamp(), sum));
> }
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("start", "end", "sum"));
> }
> } {code}
>
> And `PrintBolt` just prints the `windowBolt` output. (`LatePrintBolt` is
> similar)
> If I don't set the `LatePrintBolt` in `TopologyBuilder`, I get the correct
> results
>
> {code:java}
> public class PrintBolt extends BaseRichBolt {
> @Override
> public void prepare(Map<String, Object> topoConf, TopologyContext
> context, OutputCollector collector) {
> }
>
> @Override
> public void execute(Tuple input) {
> System.out.println(String.format("Start: %d, End: %d, Sum:%d",
> input.getLongByField("start"), input.getLongByField("end"),
> input.getIntegerByField("sum")));
> }
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> }
> } {code}
> {code:java}
> Start: 0, End: 10000, Sum:10
> Start: 10000, End: 20000, Sum:10
> Start: 20000, End: 30000, Sum:10
> Start: 30000, End: 40000, Sum:10
> Start: 40000, End: 50000, Sum:10
> Start: 50000, End: 60000, Sum:10 {code}
>
> However, when I try to print lateEvents stream, I get the same output but on
> the first late event, I get the above-mentioned exception.
>
> I have debugged the issue. When
> [WindowedBoltExecutor|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java#L313]
> receives a late tuple, it emits the late tuple but
> [BoltOutputCollectorImpl|https://github.com/apache/storm/blob/18341682ce90976c173ecf9ac68582b1626bda8a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java#L110]
> rewraps it in a new Tuple. Now, this new tuple contains
> _WorkerTopologyContext,_ which is not serializable, hence the error.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)