Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6333#discussion_r202518473
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java
 ---
    @@ -63,54 +72,46 @@ public 
RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe
        }
     
        @SuppressWarnings("unchecked")
    -   public RegisteredBroadcastBackendStateMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
    +   public RegisteredBroadcastStateBackendMetaInfo(@Nonnull 
StateMetaInfoSnapshot snapshot) {
                this(
                        snapshot.getName(),
                        OperatorStateHandle.Mode.valueOf(
                                
snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)),
    -                   (TypeSerializer<K>) 
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER),
    -                   (TypeSerializer<V>) 
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));
    +                   (TypeSerializer<K>) Preconditions.checkNotNull(
    +                           
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)),
    +                   (TypeSerializer<V>) Preconditions.checkNotNull(
    +                           
snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)));
                
Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == 
snapshot.getBackendStateType());
        }
     
        /**
         * Creates a deep copy of the itself.
         */
    -   public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() {
    -           return new RegisteredBroadcastBackendStateMetaInfo<>(this);
    +   @Nonnull
    +   public RegisteredBroadcastStateBackendMetaInfo<K, V> deepCopy() {
    +           return new RegisteredBroadcastStateBackendMetaInfo<>(this);
        }
     
        @Nonnull
        @Override
        public StateMetaInfoSnapshot snapshot() {
    -           Map<String, String> optionsMap = Collections.singletonMap(
    -                   
StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(),
    -                   assignmentMode.toString());
    -           Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2);
    -           Map<String, TypeSerializerConfigSnapshot> 
serializerConfigSnapshotsMap = new HashMap<>(2);
    -           String keySerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString();
    -           String valueSerializerKey = 
StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString();
    -           serializerMap.put(keySerializerKey, keySerializer.duplicate());
    -           serializerConfigSnapshotsMap.put(keySerializerKey, 
keySerializer.snapshotConfiguration());
    -           serializerMap.put(valueSerializerKey, 
valueSerializer.duplicate());
    -           serializerConfigSnapshotsMap.put(valueSerializerKey, 
valueSerializer.snapshotConfiguration());
    -
    -           return new StateMetaInfoSnapshot(
    -                   name,
    -                   StateMetaInfoSnapshot.BackendStateType.BROADCAST,
    -                   optionsMap,
    -                   serializerConfigSnapshotsMap,
    -                   serializerMap);
    +           if (precomputedSnapshot == null) {
    +                   precomputedSnapshot = precomputeSnapshot();
    +           }
    +           return precomputedSnapshot;
    --- End diff --
    
    What if the serializers are not all immutable? Should we need a `immutable` 
field for it? Only when it is true we return the `precomputeSnapshot`.


---

Reply via email to