[
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308030#comment-16308030
]
ASF GitHub Bot commented on FLINK-8345:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5230#discussion_r159223798
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TwoInputStreamOperator} for executing {@link
KeyedBroadcastProcessFunction}.
+ *
+ * @param <KS> The key type of the input keyed stream.
+ * @param <IN1> The input type of the broadcast side.
+ * @param <IN2> The input type of the keyed (non-broadcast) side.
+ * @param <K> The key type of the elements in the {@link BroadcastState}.
+ * @param <V> The value type of the elements in the {@link BroadcastState}.
+ * @param <OUT> The output type of the operator.
+ */
+@Internal
+public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, K, V, OUT>
+ extends AbstractUdfStreamOperator<OUT,
KeyedBroadcastProcessFunction<KS, IN1, IN2, K, V, OUT>>
+ implements TwoInputStreamOperator<IN1, IN2, OUT>,
Triggerable<K, VoidNamespace> {
+
+ private static final long serialVersionUID = 5926499536290284870L;
+
+ private final MapStateDescriptor<K, V> broadcastStateDescriptor;
+
+ private transient TimestampedCollector<OUT> collector;
+
+ private transient BroadcastState<K, V> broadcastState;
+
+ private transient ReadWriteContextImpl rwContext;
+
+ private transient ReadOnlyContextImpl rContext;
+
+ private transient OnTimerContextImpl onTimerContext;
+
+ public CoBroadcastWithKeyedOperator(
+ final KeyedBroadcastProcessFunction<KS, IN1, IN2, K, V,
OUT> function,
+ final MapStateDescriptor<K, V>
broadcastStateDescriptor) {
+ super(function);
+ this.broadcastStateDescriptor =
Preconditions.checkNotNull(broadcastStateDescriptor);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers",
VoidNamespaceSerializer.INSTANCE, this);
+
+ TimerService timerService = new
SimpleTimerService(internalTimerService);
+
+ collector = new TimestampedCollector<>(output);
+ broadcastState =
getOperatorStateBackend().getBroadcastState(broadcastStateDescriptor);
+
+ rwContext = new ReadWriteContextImpl(getKeyedStateBackend(),
userFunction, broadcastState, timerService);
+ rContext = new ReadOnlyContextImpl(userFunction,
broadcastState, timerService);
+ onTimerContext = new OnTimerContextImpl(userFunction,
broadcastState, timerService);
+ }
+
+ @Override
+ public void processElement1(StreamRecord<IN1> element) throws Exception
{
--- End diff --
The operators still have the mapping `broadcast -> first input`, `main
input -> second input`. This should be brought in line with how the API now
works.
> Iterate over keyed state on broadcast side of connect with broadcast.
> ---------------------------------------------------------------------
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)