[
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339358#comment-16339358
]
ASF GitHub Bot commented on FLINK-8345:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5230#discussion_r163870666
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
---
@@ -513,17 +630,100 @@ public void addAll(List<S> values) throws Exception {
}
}
+ private <K, V> BroadcastState<K, V> getBroadcastState(
+ final MapStateDescriptor<K, V> stateDescriptor,
+ final OperatorStateHandle.Mode mode) throws
StateMigrationException {
+
+ Preconditions.checkNotNull(stateDescriptor);
+ String name =
Preconditions.checkNotNull(stateDescriptor.getName());
+
+ @SuppressWarnings("unchecked")
+ BackendWritableBroadcastState<K, V> previous =
(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(name);
+ if (previous != null) {
+ checkStateNameAndMode(
+ previous.getStateMetaInfo().getName(),
+ name,
+
previous.getStateMetaInfo().getAssignmentMode(),
+ mode);
+ return previous;
+ }
+
+
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+ TypeSerializer<K> broadcastStateKeySerializer =
Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
+ TypeSerializer<V> broadcastStateValueSerializer =
Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
+
+ BackendWritableBroadcastState<K, V> broadcastState =
(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
+
+ if (broadcastState == null) {
+ broadcastState = new HeapBroadcastState<>(
+ new
RegisteredBroadcastBackendStateMetaInfo<>(
+ name,
+ mode,
+
broadcastStateKeySerializer,
+
broadcastStateValueSerializer));
+ registeredBroadcastStates.put(name, broadcastState);
+ } else {
--- End diff --
No, because we have the `accessedBroadcastStatesByName.get(name)` above
(line 641).
As soon as we create or restore the broadcast state, we put it there (line
708). The next time we will try to access it, we will hit the cache
(`accessedBroadcastStatesByName`) so we will not go through the creation/check
phase.
> Iterate over keyed state on broadcast side of connect with broadcast.
> ---------------------------------------------------------------------
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Major
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)