Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r163870666
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
    @@ -513,17 +630,100 @@ public void addAll(List<S> values) throws Exception {
                }
        }
     
    +   private <K, V> BroadcastState<K, V> getBroadcastState(
    +                   final MapStateDescriptor<K, V> stateDescriptor,
    +                   final OperatorStateHandle.Mode mode) throws 
StateMigrationException {
    +
    +           Preconditions.checkNotNull(stateDescriptor);
    +           String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
    +
    +           @SuppressWarnings("unchecked")
    +           BackendWritableBroadcastState<K, V> previous = 
(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);
    +           if (previous != null) {
    +                   checkStateNameAndMode(
    +                                   previous.getStateMetaInfo().getName(),
    +                                   name,
    +                                   
previous.getStateMetaInfo().getAssignmentMode(),
    +                                   mode);
    +                   return previous;
    +           }
    +
    +           
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
    +           TypeSerializer<K> broadcastStateKeySerializer = 
Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
    +           TypeSerializer<V> broadcastStateValueSerializer = 
Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
    +
    +           BackendWritableBroadcastState<K, V> broadcastState = 
(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
    +
    +           if (broadcastState == null) {
    +                   broadcastState = new HeapBroadcastState<>(
    +                                   new 
RegisteredBroadcastBackendStateMetaInfo<>(
    +                                                   name,
    +                                                   mode,
    +                                                   
broadcastStateKeySerializer,
    +                                                   
broadcastStateValueSerializer));
    +                   registeredBroadcastStates.put(name, broadcastState);
    +           } else {
    --- End diff --
    
    No, because we have the `accessedBroadcastStatesByName.get(name)` above 
(line 641). 
    
    As soon as we create or restore the broadcast state, we put it there (line 
708). The next time we will try to access it, we will hit the cache 
(`accessedBroadcastStatesByName`) so we will not go through the creation/check 
phase.


---

Reply via email to