[ 
https://issues.apache.org/jira/browse/FLINK-9701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541682#comment-16541682
 ] 

ASF GitHub Bot commented on FLINK-9701:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6313#discussion_r202042548
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
    @@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
     
        @Override
        public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
    -           return entriesStream()::iterator;
    +           return entries(e -> e);
        }
     
    -   private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
    +   private <R> Iterable<R> entries(
    +           Function<Map.Entry<UK, UV>, R> resultMapper) throws Exception {
                Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = 
original.entries();
    -           withTs = withTs == null ? Collections.emptyList() : withTs;
    -           return StreamSupport
    -                   .stream(withTs.spliterator(), false)
    -                   .filter(this::unexpiredAndUpdateOrCleanup)
    -                   .map(TtlMapState::unwrapWithoutTs);
    -   }
    -
    -   private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> 
e) {
    -           UV unexpiredValue;
    -           try {
    -                   unexpiredValue = getWithTtlCheckAndUpdate(
    -                           e::getValue,
    -                           v -> original.put(e.getKey(), v),
    -                           () -> original.remove(e.getKey()));
    -           } catch (Exception ex) {
    -                   throw new FlinkRuntimeException(ex);
    -           }
    -           return unexpiredValue != null;
    -   }
    -
    -   private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK, 
TtlValue<UV>> e) {
    -           return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
    +           return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
        }
     
        @Override
        public Iterable<UK> keys() throws Exception {
    -           return entriesStream().map(Map.Entry::getKey)::iterator;
    +           return entries(Map.Entry::getKey);
        }
     
        @Override
        public Iterable<UV> values() throws Exception {
    -           return entriesStream().map(Map.Entry::getValue)::iterator;
    +           return entries(Map.Entry::getValue);
        }
     
        @Override
        public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
    -           return entriesStream().iterator();
    +           return entries().iterator();
        }
     
        @Override
        public void clear() {
                original.clear();
        }
    +
    +   private class EntriesIterator<R> implements Iterator<R> {
    +           private final Iterator<Map.Entry<UK, TtlValue<UV>>> 
originalIterator;
    +           private final Function<Map.Entry<UK, UV>, R> resultMapper;
    +           private Map.Entry<UK, UV> nextUnexpired = null;
    +           private boolean rightAfterNextIsCalled = false;
    +
    +           private EntriesIterator(
    +                   @Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs,
    +                   @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) {
    +                   this.originalIterator = withTs.iterator();
    +                   this.resultMapper = resultMapper;
    +           }
    +
    +           @Override
    +           public boolean hasNext() {
    +                   rightAfterNextIsCalled = false;
    +                   while (nextUnexpired == null && 
originalIterator.hasNext()) {
    +                           nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
    +                   }
    +                   return nextUnexpired != null;
    +           }
    +
    +           @Override
    +           public R next() {
    +                   if (hasNext()) {
    +                           rightAfterNextIsCalled = true;
    +                           R result = resultMapper.apply(nextUnexpired);
    +                           nextUnexpired = null;
    +                           return result;
    +                   }
    +                   throw new NoSuchElementException();
    +           }
    +
    +           @Override
    +           public void remove() {
    +                   if (rightAfterNextIsCalled) {
    --- End diff --
    
    I agree, seems like there is no good solution for this.


> Activate TTL in state descriptors
> ---------------------------------
>
>                 Key: FLINK-9701
>                 URL: https://issues.apache.org/jira/browse/FLINK-9701
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Andrey Zagrebin
>            Assignee: Andrey Zagrebin
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to