This is an automated email from the ASF dual-hosted git repository.
rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 6808fbc43 Porting changes from STORM-3693 (#3828)
6808fbc43 is described below
commit 6808fbc43f486ccae35279872f71fbcfc749a8ac
Author: reiabreu <[email protected]>
AuthorDate: Sun Jan 12 13:51:58 2025 +0000
Porting changes from STORM-3693 (#3828)
---
storm-client/src/jvm/org/apache/storm/executor/Executor.java | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index b3df9c9e6..2d287c777 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -285,6 +285,8 @@ public abstract class Executor implements Callable,
JCQueue.Consumer {
int taskId = addressedTuple.getDest();
TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+ String streamId = tuple.getSourceStreamId();
+ boolean isSpout = this instanceof SpoutExecutor;
if (isDebug) {
LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple,
taskId);
}
@@ -292,6 +294,10 @@ public abstract class Executor implements Callable,
JCQueue.Consumer {
try {
if (taskId != AddressedTuple.BROADCAST_DEST) {
tupleActionFn(taskId, tuple);
+ } else if (isSpout &&
streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+ //taskId is irrelevant here. Ensures pending.rotate() is
called once per tick.
+ tupleActionFn(taskIds.get(0), tuple);
+
} else {
for (Integer t : taskIds) {
tupleActionFn(t, tuple);