Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202518473 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastStateBackendMetaInfo.java --- @@ -63,54 +72,46 @@ public RegisteredBroadcastBackendStateMetaInfo(RegisteredBroadcastBackendStateMe } @SuppressWarnings("unchecked") - public RegisteredBroadcastBackendStateMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { + public RegisteredBroadcastStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { this( snapshot.getName(), OperatorStateHandle.Mode.valueOf( snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE)), - (TypeSerializer<K>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER), - (TypeSerializer<V>) snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); + (TypeSerializer<K>) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER)), + (TypeSerializer<V>) Preconditions.checkNotNull( + snapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER))); Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.BROADCAST == snapshot.getBackendStateType()); } /** * Creates a deep copy of the itself. */ - public RegisteredBroadcastBackendStateMetaInfo<K, V> deepCopy() { - return new RegisteredBroadcastBackendStateMetaInfo<>(this); + @Nonnull + public RegisteredBroadcastStateBackendMetaInfo<K, V> deepCopy() { + return new RegisteredBroadcastStateBackendMetaInfo<>(this); } @Nonnull @Override public StateMetaInfoSnapshot snapshot() { - Map<String, String> optionsMap = Collections.singletonMap( - StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(), - assignmentMode.toString()); - Map<String, TypeSerializer<?>> serializerMap = new HashMap<>(2); - Map<String, TypeSerializerConfigSnapshot> serializerConfigSnapshotsMap = new HashMap<>(2); - String keySerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER.toString(); - String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); - serializerMap.put(keySerializerKey, keySerializer.duplicate()); - serializerConfigSnapshotsMap.put(keySerializerKey, keySerializer.snapshotConfiguration()); - serializerMap.put(valueSerializerKey, valueSerializer.duplicate()); - serializerConfigSnapshotsMap.put(valueSerializerKey, valueSerializer.snapshotConfiguration()); - - return new StateMetaInfoSnapshot( - name, - StateMetaInfoSnapshot.BackendStateType.BROADCAST, - optionsMap, - serializerConfigSnapshotsMap, - serializerMap); + if (precomputedSnapshot == null) { + precomputedSnapshot = precomputeSnapshot(); + } + return precomputedSnapshot; --- End diff -- What if the serializers are not all immutable? Should we need a `immutable` field for it? Only when it is true we return the `precomputeSnapshot`.
---