starmilkxin commented on PR #296:
URL: https://github.com/apache/rocketmq-streams/pull/296#issuecomment-1630287692

   I did some simple tests, and the windowEnd will change with the arrival of 
data.
   Here the sessionTimeout of sessionWindow is 5 seconds. Each message sent 
causes it to keep updating windowEnd. Pause for 4 seconds after each message is 
sent, a total of 5 pieces of data are sent, and it takes 4 * 4 + 5 = 21 seconds.
   
   ```java
   public class UserProducer {
       private static final String topic = "windowCount";
       private static final Random random = new Random();
       public static void main(String[] args) throws Exception {
   
           DefaultMQProducer producer = new DefaultMQProducer("Pro_Group");
   
           producer.setNamesrvAddr("localhost:9876");
   
           producer.start();
   
           long time = 4000000000000L;
   
           for (int i = 0; i < 5; i++) {
               User user = new User("小红" + i, 13, time + 4000*i);
               byte[] body = JSON.toJSONBytes(user);
               Message msg = new Message(topic, "", body);
               SendResult sendResult = producer.send(msg);
               System.out.printf("%s\n", user);
               Thread.sleep(4000);
           }
   
           //Shut down once the producer instance is not longer in use.
           producer.shutdown();
       }
   }
   
   User{name='小红0', age=13, timeStamp=4000000000000}
   User{name='小红1', age=13, timeStamp=4000000004000}
   User{name='小红2', age=13, timeStamp=4000000008000}
   User{name='小红3', age=13, timeStamp=4000000012000}
   User{name='小红4', age=13, timeStamp=4000000016000}
   ```
   
   ```java
   // AccumulatorSessionWindowFire.fire
                   logger.info("fire session,windowKey={}, search keyPrefix={}, 
window: [{} - {}]",
                           windowKey, state.getKey().toString(), 
Utils.format(windowBegin), Utils.format(windowEnd));
   
   2023-07-11 00:27:49.121 [ScanIdleWindowThread_1] INFO  
o.a.r.s.c.window.fire.AccumulatorSessionWindowFire - fire 
session,windowKey=sessionWindowCount-ROCKETMQ-COUNT-00004@SessionWindowAccumulatorProcessor&&31a7fab88e791062b8f7a30640648be6&&1689006466145&&1689006445131,
 search keyPrefix=13, window: [2023-07-11 00:27:25 - 2023-07-11 00:27:46]
   ```
   
   ```java
   // PrintSupplier.process
                   String template = "(key=%s, value=%s)";
   
                   Data<Object, T> result = new Data<>(this.context.getKey(), 
data, this.context.getDataTime(), header);
                   String format = String.format(template, result.getKey(), 
data.toString());
   
                   System.out.println(format);
   
   
   [2023-07-11 00:27:25 - 2023-07-11 00:27:46](key=13, value=5)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to