This is an automated email from the ASF dual-hosted git repository.

rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 6808fbc43 Porting changes from STORM-3693 (#3828)
6808fbc43 is described below

commit 6808fbc43f486ccae35279872f71fbcfc749a8ac
Author: reiabreu <[email protected]>
AuthorDate: Sun Jan 12 13:51:58 2025 +0000

    Porting changes from STORM-3693 (#3828)
---
 storm-client/src/jvm/org/apache/storm/executor/Executor.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java 
b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index b3df9c9e6..2d287c777 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -285,6 +285,8 @@ public abstract class Executor implements Callable, 
JCQueue.Consumer {
         int taskId = addressedTuple.getDest();
 
         TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+        String streamId = tuple.getSourceStreamId();
+        boolean isSpout = this instanceof SpoutExecutor;
         if (isDebug) {
             LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, 
taskId);
         }
@@ -292,6 +294,10 @@ public abstract class Executor implements Callable, 
JCQueue.Consumer {
         try {
             if (taskId != AddressedTuple.BROADCAST_DEST) {
                 tupleActionFn(taskId, tuple);
+            } else if (isSpout && 
streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+                //taskId is irrelevant here. Ensures pending.rotate() is 
called once per tick.
+                tupleActionFn(taskIds.get(0), tuple);
+
             } else {
                 for (Integer t : taskIds) {
                     tupleActionFn(t, tuple);

Reply via email to