This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
The following commit(s) were added to refs/heads/main by this push:
new 9ff48dfa modify method note.
9ff48dfa is described below
commit 9ff48dfa216747c83565379d6c2cc268f1b69b80
Author: 维章 <[email protected]>
AuthorDate: Fri Jan 6 11:03:47 2023 +0800
modify method note.
---
.../apache/rocketmq/streams/core/function/FilterAction.java | 2 +-
.../streams/core/function/accumulator/Accumulator.java | 7 ++++---
.../core/function/supplier/WindowAccumulatorSupplier.java | 8 +-------
.../core/function/supplier/WindowAggregateSupplier.java | 8 +-------
.../rocketmq/streams/core/running/AbstractProcessor.java | 6 ++++++
.../org/apache/rocketmq/streams/core/state/StateStore.java | 13 +++----------
6 files changed, 16 insertions(+), 28 deletions(-)
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/FilterAction.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/FilterAction.java
index e0d63699..baa15e8d 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/FilterAction.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/FilterAction.java
@@ -20,7 +20,7 @@ package org.apache.rocketmq.streams.core.function;
public interface FilterAction<T> {
/**
*
- * @param value
+ * @param value the target to filter
* @return pass if true;
*/
boolean apply(final T value);
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/accumulator/Accumulator.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/accumulator/Accumulator.java
index 8e9c26ef..c30d5344 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/accumulator/Accumulator.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/accumulator/Accumulator.java
@@ -25,9 +25,10 @@ public interface Accumulator<V, R> {
void merge(Accumulator<V, R> other);
/**
- * 状态触发后调用,context可以加入状态触发后产生的某些条件,传递给算子
- * @param context
- * @return
+ * invoked after the window fired.
+ *
+ * @param context the attached properties after window fired.
+ * @return the value.
*/
R result(Properties context);
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
index e340e022..54f83646 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAccumulatorSupplier.java
@@ -336,13 +336,7 @@ public class WindowAccumulatorSupplier<K, V, R, OV>
implements Supplier<Processo
public abstract class CommonWindowFire extends AbstractWindowProcessor<V> {
protected WindowStore<K, Accumulator<R, OV>> windowStore;
- /**
- * 触发窗口结束时间 <= watermark 的窗口
- *
- * @param watermark
- * @param key
- * @throws Throwable
- */
+
protected void fireWindowEndTimeLassThanWatermark(long watermark,
String operatorName, K key) throws Throwable {
WindowKey windowKeyWatermark = new WindowKey(operatorName,
toHexString(key), watermark, 0L);
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
index d6ca254e..5eae1023 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/WindowAggregateSupplier.java
@@ -332,13 +332,7 @@ public class WindowAggregateSupplier<K, V, OV> implements
Supplier<Processor<V>>
public abstract class CommonWindowFire extends AbstractWindowProcessor<V> {
protected WindowStore<K, OV> windowStore;
- /**
- * 触发窗口结束时间 <= watermark 的窗口
- *
- * @param watermark
- * @param key
- * @throws Throwable
- */
+
protected void fireWindowEndTimeLassThanWatermark(long watermark,
String operatorName, K key) throws Throwable {
WindowKey windowKeyWatermark = new WindowKey(operatorName,
toHexString(key), watermark, 0L);
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
index 2b7e0952..0998de2d 100644
---
a/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
+++
b/core/src/main/java/org/apache/rocketmq/streams/core/running/AbstractProcessor.java
@@ -94,6 +94,9 @@ public abstract class AbstractProcessor<T> implements
Processor<T> {
* | classname | |object length | |
* +-----------+--------------+---------------+-------------+
* </pre>
+ * @param obj the object to serialize;
+ * @return byte[] the result of serialize
+ * @throws JsonProcessingException serialize exception.
*/
protected byte[] object2Byte(Object obj) throws JsonProcessingException {
if (obj == null) {
@@ -126,6 +129,9 @@ public abstract class AbstractProcessor<T> implements
Processor<T> {
* | classname | |object length | |
* +-----------+--------------+---------------+-------------+
* </pre>
+ * @param bytes the byte array to deserialize;
+ * @return V the result of deserialize
+ * @throws Throwable deserialize exception.
*/
@SuppressWarnings("unchecked")
public <V> V byte2Object(byte[] bytes) throws Throwable {
diff --git
a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
index 4a0ce0b7..7f75361c 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
@@ -28,19 +28,12 @@ import java.util.Set;
public interface StateStore extends AutoCloseable {
void init() throws Throwable;
- /**
- * @param addQueues messageQueue of source topic
- * @param removeQueues messageQueue of source topic
- * @throws Throwable
- */
+
+ //addQueues messageQueue of source topic,removeQueues messageQueue of
source topic
void recover(Set<MessageQueue> addQueues, Set<MessageQueue> removeQueues)
throws Throwable;
- /**
- * @param messageQueue 检查source topic中该queue的状态是否已经加载好,如果没有加载好,等待加载
- * @throws Throwable
- */
- //如果没准备好,会阻塞
+ //messageQueue check the state of source topic is ok, wait if not.
void waitIfNotReady(MessageQueue messageQueue) throws
RecoverStateStoreThrowable;