This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ff48dfa modify method note.
9ff48dfa is described below

commit 9ff48dfa216747c83565379d6c2cc268f1b69b80
Author: 维章 <[email protected]>
AuthorDate: Fri Jan 6 11:03:47 2023 +0800

    modify method note.
---
 .../apache/rocketmq/streams/core/function/FilterAction.java |  2 +-
 .../streams/core/function/accumulator/Accumulator.java      |  7 ++++---
 .../core/function/supplier/WindowAccumulatorSupplier.java   |  8 +-------
 .../core/function/supplier/WindowAggregateSupplier.java     |  8 +-------
 .../rocketmq/streams/core/running/AbstractProcessor.java    |  6 ++++++
 .../org/apache/rocketmq/streams/core/state/StateStore.java  | 13 +++----------
 6 files changed, 16 insertions(+), 28 deletions(-)

diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/FilterAction.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/FilterAction.java
index e0d63699..baa15e8d 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/FilterAction.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/FilterAction.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.streams.core.function;
 public interface FilterAction<T> {
     /**
      *
-     * @param value
+     * @param value the target to filter
      * @return pass if true;
      */
     boolean apply(final T value);
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/accumulator/Accumulator.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/accumulator/Accumulator.java
index 8e9c26ef..c30d5344 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/accumulator/Accumulator.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/accumulator/Accumulator.java
@@ -25,9 +25,10 @@ public interface Accumulator<V, R> {
     void merge(Accumulator<V, R> other);
 
     /**
-     * 状态触发后调用,context可以加入状态触发后产生的某些条件,传递给算子
-     * @param context
-     * @return
+     * invoked after the window fired.
+     *
+     * @param context the attached properties after window fired.
+     * @return the value.
      */
     R result(Properties context);
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
index e340e022..54f83646 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
@@ -336,13 +336,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV> 
implements Supplier<Processo
     public abstract class CommonWindowFire extends AbstractWindowProcessor<V> {
         protected WindowStore<K, Accumulator<R, OV>> windowStore;
 
-        /**
-         * 触发窗口结束时间 <= watermark 的窗口
-         *
-         * @param watermark
-         * @param key
-         * @throws Throwable
-         */
+
         protected void fireWindowEndTimeLassThanWatermark(long watermark, 
String operatorName, K key) throws Throwable {
             WindowKey windowKeyWatermark = new WindowKey(operatorName, 
toHexString(key), watermark, 0L);
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
index d6ca254e..5eae1023 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
@@ -332,13 +332,7 @@ public class WindowAggregateSupplier<K, V, OV> implements 
Supplier<Processor<V>>
     public abstract class CommonWindowFire extends AbstractWindowProcessor<V> {
         protected WindowStore<K, OV> windowStore;
 
-        /**
-         * 触发窗口结束时间 <= watermark 的窗口
-         *
-         * @param watermark
-         * @param key
-         * @throws Throwable
-         */
+
         protected void fireWindowEndTimeLassThanWatermark(long watermark, 
String operatorName, K key) throws Throwable {
             WindowKey windowKeyWatermark = new WindowKey(operatorName, 
toHexString(key), watermark, 0L);
 
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
index 2b7e0952..0998de2d 100644
--- 
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
+++ 
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
@@ -94,6 +94,9 @@ public abstract class AbstractProcessor<T> implements 
Processor<T> {
      * | classname |              |object length |             |
      * +-----------+--------------+---------------+-------------+
      * </pre>
+     * @param obj the object to serialize;
+     * @return byte[] the result of serialize
+     * @throws JsonProcessingException serialize exception.
      */
     protected byte[] object2Byte(Object obj) throws JsonProcessingException {
         if (obj == null) {
@@ -126,6 +129,9 @@ public abstract class AbstractProcessor<T> implements 
Processor<T> {
      * | classname |              |object length |             |
      * +-----------+--------------+---------------+-------------+
      * </pre>
+     * @param bytes the byte array to deserialize;
+     * @return V the result of deserialize
+     * @throws Throwable deserialize exception.
      */
     @SuppressWarnings("unchecked")
     public <V> V byte2Object(byte[] bytes) throws Throwable {
diff --git 
a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java 
b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
index 4a0ce0b7..7f75361c 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
@@ -28,19 +28,12 @@ import java.util.Set;
 public interface StateStore extends AutoCloseable {
     void init() throws Throwable;
 
-    /**
-     * @param addQueues    messageQueue of source topic
-     * @param removeQueues messageQueue of source topic
-     * @throws Throwable
-     */
+
+    //addQueues    messageQueue of source topic,removeQueues messageQueue of 
source topic
     void recover(Set<MessageQueue> addQueues, Set<MessageQueue> removeQueues) 
throws Throwable;
 
 
-    /**
-     * @param messageQueue 检查source topic中该queue的状态是否已经加载好,如果没有加载好,等待加载
-     * @throws Throwable
-     */
-    //如果没准备好,会阻塞
+    //messageQueue check the state of source topic is ok, wait if not.
     void waitIfNotReady(MessageQueue messageQueue) throws 
RecoverStateStoreThrowable;
 
 

Reply via email to