[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339358#comment-16339358
 ] 

ASF GitHub Bot commented on FLINK-8345:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r163870666
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
    @@ -513,17 +630,100 @@ public void addAll(List<S> values) throws Exception {
                }
        }
     
    +   private <K, V> BroadcastState<K, V> getBroadcastState(
    +                   final MapStateDescriptor<K, V> stateDescriptor,
    +                   final OperatorStateHandle.Mode mode) throws 
StateMigrationException {
    +
    +           Preconditions.checkNotNull(stateDescriptor);
    +           String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
    +
    +           @SuppressWarnings("unchecked")
    +           BackendWritableBroadcastState<K, V> previous = 
(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);
    +           if (previous != null) {
    +                   checkStateNameAndMode(
    +                                   previous.getStateMetaInfo().getName(),
    +                                   name,
    +                                   
previous.getStateMetaInfo().getAssignmentMode(),
    +                                   mode);
    +                   return previous;
    +           }
    +
    +           
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
    +           TypeSerializer<K> broadcastStateKeySerializer = 
Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
    +           TypeSerializer<V> broadcastStateValueSerializer = 
Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
    +
    +           BackendWritableBroadcastState<K, V> broadcastState = 
(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
    +
    +           if (broadcastState == null) {
    +                   broadcastState = new HeapBroadcastState<>(
    +                                   new 
RegisteredBroadcastBackendStateMetaInfo<>(
    +                                                   name,
    +                                                   mode,
    +                                                   
broadcastStateKeySerializer,
    +                                                   
broadcastStateValueSerializer));
    +                   registeredBroadcastStates.put(name, broadcastState);
    +           } else {
    --- End diff --
    
    No, because we have the `accessedBroadcastStatesByName.get(name)` above 
(line 641). 
    
    As soon as we create or restore the broadcast state, we put it there (line 
708). The next time we will try to access it, we will hit the cache 
(`accessedBroadcastStatesByName`) so we will not go through the creation/check 
phase.


> Iterate over keyed state on broadcast side of connect with broadcast.
> ---------------------------------------------------------------------
>
>                 Key: FLINK-8345
>                 URL: https://issues.apache.org/jira/browse/FLINK-8345
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>    Affects Versions: 1.5.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Major
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to