This is an automated email from the ASF dual-hosted git repository.
jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0779c91e581 [FLINK-34258][docs][table] Fix incorrect retract example
for TableAggregateFunction
0779c91e581 is described below
commit 0779c91e581dc16c4aef61d6cc27774f11495907
Author: Jane Chan <[email protected]>
AuthorDate: Fri Feb 2 10:10:40 2024 +0800
[FLINK-34258][docs][table] Fix incorrect retract example for
TableAggregateFunction
This closes #24215
---
docs/content.zh/docs/dev/table/functions/udfs.md | 13 ++++++++-----
docs/content/docs/dev/table/functions/udfs.md | 14 +++++++-------
2 files changed, 15 insertions(+), 12 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/functions/udfs.md
b/docs/content.zh/docs/dev/table/functions/udfs.md
index bf10e3b6482..5527cfcb978 100644
--- a/docs/content.zh/docs/dev/table/functions/udfs.md
+++ b/docs/content.zh/docs/dev/table/functions/udfs.md
@@ -1892,7 +1892,10 @@ tab
{{< /tabs >}}
-下面的例子展示了如何使用 `emitUpdateWithRetract` 方法来只发送更新的数据。为了只发送更新的结果,accumulator
保存了上一次的最大的2个值,也保存了当前最大的2个值。注意:如果 TopN 中的 n
非常大,这种既保存上次的结果,也保存当前的结果的方式不太高效。一种解决这种问题的方式是把输入数据直接存储到 `accumulator` 中,然后在调用
`emitUpdateWithRetract` 方法时再进行计算。
+下面的例子展示了如何使用 `emitUpdateWithRetract` 方法来只发送更新的数据。为了只发送更新的结果,accumulator
保存了上一次的最大的2个值,也保存了当前最大的2个值。
+{{< hint info >}}
+注意:请不要在 `emitUpdateWithRetract` 方法中更新 accumulator,因为在调用
`function#emitUpdateWithRetract` 之后,`GroupTableAggFunction` 不会重新调用
`function#getAccumulators` 来将最新的 accumulator 更新到状态中。
+{{< /hint >}}
{{< tabs "e0d841fe-8d95-4706-9e19-e76141171966" >}}
{{< tab "Java" >}}
@@ -1923,6 +1926,8 @@ public static class Top2 extends
TableAggregateFunction<Tuple2<Integer, Integer>
}
public void accumulate(Top2Accum acc, Integer v) {
+ acc.oldFirst = acc.first;
+ acc.oldSecond = acc.second;
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
@@ -1938,7 +1943,6 @@ public static class Top2 extends
TableAggregateFunction<Tuple2<Integer, Integer>
out.retract(Tuple2.of(acc.oldFirst, 1));
}
out.collect(Tuple2.of(acc.first, 1));
- acc.oldFirst = acc.first;
}
if (!acc.second.equals(acc.oldSecond)) {
@@ -1947,7 +1951,6 @@ public static class Top2 extends
TableAggregateFunction<Tuple2<Integer, Integer>
out.retract(Tuple2.of(acc.oldSecond, 2));
}
out.collect(Tuple2.of(acc.second, 2));
- acc.oldSecond = acc.second;
}
}
}
@@ -1997,6 +2000,8 @@ class Top2 extends
TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
}
def accumulate(acc: Top2Accum, v: Int) {
+ acc.oldFirst = acc.first
+ acc.oldSecond = acc.second
if (v > acc.first) {
acc.second = acc.first
acc.first = v
@@ -2015,7 +2020,6 @@ class Top2 extends
TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
out.retract(JTuple2.of(acc.oldFirst, 1))
}
out.collect(JTuple2.of(acc.first, 1))
- acc.oldFirst = acc.first
}
if (acc.second != acc.oldSecond) {
// if there is an update, retract old value then emit new value.
@@ -2023,7 +2027,6 @@ class Top2 extends
TableAggregateFunction[JTuple2[JInteger, JInteger], Top2Accum
out.retract(JTuple2.of(acc.oldSecond, 2))
}
out.collect(JTuple2.of(acc.second, 2))
- acc.oldSecond = acc.second
}
}
}
diff --git a/docs/content/docs/dev/table/functions/udfs.md
b/docs/content/docs/dev/table/functions/udfs.md
index 45f981f430c..b0c095f26b8 100644
--- a/docs/content/docs/dev/table/functions/udfs.md
+++ b/docs/content/docs/dev/table/functions/udfs.md
@@ -1778,9 +1778,9 @@ def emitUpdateWithRetract(accumulator: ACC, out:
RetractableCollector[T]): Unit
The following example shows how to use the `emitUpdateWithRetract(...)` method
to emit only incremental
updates. In order to do so, the accumulator keeps both the old and new top 2
values.
-If the N of Top N is big, it might be inefficient to keep both the old and new
values. One way to
-solve this case is to store only the input record in the accumulator in
`accumulate` method and then perform
-a calculation in `emitUpdateWithRetract`.
+{{< hint info >}}
+Note: Do not update accumulator within `emitUpdateWithRetract` because after
`function#emitUpdateWithRetract` is invoked, `GroupTableAggFunction` will not
re-invoke `function#getAccumulators` to update the latest accumulator to state.
+{{< /hint >}}
{{< tabs "043e94c6-05b5-4800-9e5f-7d11235f3a11" >}}
{{< tab "Java" >}}
@@ -1809,6 +1809,8 @@ public static class Top2WithRetract
}
public void accumulate(Top2WithRetractAccumulator acc, Integer v) {
+ acc.oldFirst = acc.first;
+ acc.oldSecond = acc.second;
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
@@ -1826,7 +1828,6 @@ public static class Top2WithRetract
out.retract(Tuple2.of(acc.oldFirst, 1));
}
out.collect(Tuple2.of(acc.first, 1));
- acc.oldFirst = acc.first;
}
if (!acc.second.equals(acc.oldSecond)) {
// if there is an update, retract the old value then emit a new value
@@ -1834,7 +1835,6 @@ public static class Top2WithRetract
out.retract(Tuple2.of(acc.oldSecond, 2));
}
out.collect(Tuple2.of(acc.second, 2));
- acc.oldSecond = acc.second;
}
}
}
@@ -1866,6 +1866,8 @@ class Top2WithRetract
}
def accumulate(acc: Top2WithRetractAccumulator, value: Integer): Unit = {
+ acc.oldFirst = acc.first
+ acc.oldSecond = acc.second
if (value > acc.first) {
acc.second = acc.first
acc.first = value
@@ -1884,7 +1886,6 @@ class Top2WithRetract
out.retract(Tuple2.of(acc.oldFirst, 1))
}
out.collect(Tuple2.of(acc.first, 1))
- acc.oldFirst = acc.first
}
if (!acc.second.equals(acc.oldSecond)) {
// if there is an update, retract the old value then emit a new value
@@ -1892,7 +1893,6 @@ class Top2WithRetract
out.retract(Tuple2.of(acc.oldSecond, 2))
}
out.collect(Tuple2.of(acc.second, 2))
- acc.oldSecond = acc.second
}
}
}