This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 907d0f32126 [FLINK-33881][state] Avoid copy and update value in
TtlListState#getUnexpiredOrNull
907d0f32126 is described below
commit 907d0f32126b9f8acfc80f3f4098e71cb37f0e37
Author: Jinzhong Li <[email protected]>
AuthorDate: Tue Jan 9 18:02:43 2024 +0800
[FLINK-33881][state] Avoid copy and update value in
TtlListState#getUnexpiredOrNull
---
.../flink/runtime/state/ttl/TtlListState.java | 21 ++++++++++++++++++---
1 file changed, 18 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
index 797127582e1..fa4d50fa6b0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -109,12 +109,27 @@ class TtlListState<K, N, T>
}
long currentTimestamp = timeProvider.currentTimestamp();
- List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
TypeSerializer<TtlValue<T>> elementSerializer =
((ListSerializer<TtlValue<T>>) original.getValueSerializer())
.getElementSerializer();
- for (TtlValue<T> ttlValue : ttlValues) {
- if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) {
+ int firstExpireElementIndex = -1;
+ for (int i = 0; i < ttlValues.size(); i++) {
+ TtlValue<T> ttlValue = ttlValues.get(i);
+ if (TtlUtils.expired(ttlValue, ttl, currentTimestamp)) {
+ firstExpireElementIndex = i;
+ break;
+ }
+ }
+ if (firstExpireElementIndex == -1) {
+ return ttlValues;
+ }
+
+ List<TtlValue<T>> unexpired = new ArrayList<>(ttlValues.size());
+ for (int i = 0; i < ttlValues.size(); i++) {
+ TtlValue<T> ttlValue = ttlValues.get(i);
+ if (i < firstExpireElementIndex
+ || (i > firstExpireElementIndex
+ && !TtlUtils.expired(ttlValue, ttl,
currentTimestamp))) {
// we have to do the defensive copy to update the value
unexpired.add(elementSerializer.copy(ttlValue));
}