[ 
https://issues.apache.org/jira/browse/FLINK-10325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16615185#comment-16615185
 ] 

ASF GitHub Bot commented on FLINK-10325:
----------------------------------------

asfgit closed pull request #6683: [FLINK-10325] [State TTL] Refactor 
TtlListState to use only loops, no java stream API for performance
URL: https://github.com/apache/flink/pull/6683
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
index cb64df70dd2..8a09e36a2ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -23,13 +23,12 @@
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 /**
  * This class wraps list state with TTL logic.
@@ -73,11 +72,14 @@ public void addAll(List<T> values) throws Exception {
                return () -> new IteratorWithCleanup(finalResult.iterator());
        }
 
-       private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
-               List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
-                       .filter(v -> !expired(v))
-                       .map(this::rewrapWithNewTs)
-                       .collect(Collectors.toList());
+       private void updateTs(List<TtlValue<T>> ttlValues) throws Exception {
+               List<TtlValue<T>> unexpiredWithUpdatedTs = new ArrayList<>();
+               long currentTimestamp = timeProvider.currentTimestamp();
+               for (TtlValue<T> ttlValue : ttlValues) {
+                       if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) 
{
+                               
unexpiredWithUpdatedTs.add(TtlUtils.wrapWithTs(ttlValue.getUserValue(), 
currentTimestamp));
+                       }
+               }
                if (!unexpiredWithUpdatedTs.isEmpty()) {
                        original.update(unexpiredWithUpdatedTs);
                }
@@ -105,8 +107,15 @@ public void mergeNamespaces(N target, Collection<N> 
sources) throws Exception {
        }
 
        private <E> List<E> collect(Iterable<E> iterable) {
-               return iterable instanceof List ? (List<E>) iterable :
-                       StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toList());
+               if (iterable instanceof List) {
+                       return (List<E>) iterable;
+               } else {
+                       List<E> list = new ArrayList<>();
+                       for (E element : iterable) {
+                               list.add(element);
+                       }
+                       return list;
+               }
        }
 
        @Override
@@ -116,7 +125,12 @@ public void updateInternal(List<T> valueToStore) throws 
Exception {
        }
 
        private List<TtlValue<T>> withTs(List<T> values) {
-               return 
values.stream().map(this::wrapWithTs).collect(Collectors.toList());
+               List<TtlValue<T>> withTs = new ArrayList<>();
+               for (T value : values) {
+                       Preconditions.checkNotNull(value, "You cannot have null 
element in a ListState.");
+                       withTs.add(wrapWithTs(value));
+               }
+               return withTs;
        }
 
        private class IteratorWithCleanup implements Iterator<T> {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index f92e8e4d95c..160dbeb71e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -68,9 +68,10 @@ public void putAll(Map<UK, UV> map) throws Exception {
                        return;
                }
                Map<UK, TtlValue<UV>> ttlMap = new HashMap<>(map.size());
+               long currentTimestamp = timeProvider.currentTimestamp();
                for (Map.Entry<UK, UV> entry : map.entrySet()) {
                        UK key = entry.getKey();
-                       ttlMap.put(key, wrapWithTs(entry.getValue()));
+                       ttlMap.put(key, TtlUtils.wrapWithTs(entry.getValue(), 
currentTimestamp));
                }
                original.putAll(ttlMap);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
index 9d9e5e1a10d..96fdff64d3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
@@ -21,11 +21,19 @@
 /** Common functions related to State TTL. */
 class TtlUtils {
        static <V> boolean expired(TtlValue<V> ttlValue, long ttl, 
TtlTimeProvider timeProvider) {
-               return ttlValue != null && 
expired(ttlValue.getLastAccessTimestamp(), ttl, timeProvider);
+               return expired(ttlValue, ttl, timeProvider.currentTimestamp());
+       }
+
+       static <V> boolean expired(TtlValue<V> ttlValue, long ttl, long 
currentTimestamp) {
+               return ttlValue != null && 
expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
        }
 
        static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) 
{
-               return getExpirationTimestamp(ts, ttl) <= 
timeProvider.currentTimestamp();
+               return expired(ts, ttl, timeProvider.currentTimestamp());
+       }
+
+       private static boolean expired(long ts, long ttl, long 
currentTimestamp) {
+               return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
        }
 
        private static long getExpirationTimestamp(long ts, long ttl) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
index 99f8b0bde7d..a8bcadf9d19 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
@@ -28,6 +28,8 @@
  * @param <T> Type of the user value of state with TTL
  */
 class TtlValue<T> implements Serializable {
+       private static final long serialVersionUID = 5221129704201125020L;
+
        private final T userValue;
        private final long lastAccessTimestamp;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> [State TTL] Refactor TtlListState to use only loops, no java stream API for 
> performance
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-10325
>                 URL: https://issues.apache.org/jira/browse/FLINK-10325
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.6.0
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.1, 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to