Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r159223798 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java --- @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.SimpleTimerService; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction}. + * + * @param <KS> The key type of the input keyed stream. + * @param <IN1> The input type of the broadcast side. + * @param <IN2> The input type of the keyed (non-broadcast) side. + * @param <K> The key type of the elements in the {@link BroadcastState}. + * @param <V> The value type of the elements in the {@link BroadcastState}. + * @param <OUT> The output type of the operator. + */ +@Internal +public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, K, V, OUT> + extends AbstractUdfStreamOperator<OUT, KeyedBroadcastProcessFunction<KS, IN1, IN2, K, V, OUT>> + implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> { + + private static final long serialVersionUID = 5926499536290284870L; + + private final MapStateDescriptor<K, V> broadcastStateDescriptor; + + private transient TimestampedCollector<OUT> collector; + + private transient BroadcastState<K, V> broadcastState; + + private transient ReadWriteContextImpl rwContext; + + private transient ReadOnlyContextImpl rContext; + + private transient OnTimerContextImpl onTimerContext; + + public CoBroadcastWithKeyedOperator( + final KeyedBroadcastProcessFunction<KS, IN1, IN2, K, V, OUT> function, + final MapStateDescriptor<K, V> broadcastStateDescriptor) { + super(function); + this.broadcastStateDescriptor = Preconditions.checkNotNull(broadcastStateDescriptor); + } + + @Override + public void open() throws Exception { + super.open(); + + InternalTimerService<VoidNamespace> internalTimerService = + getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); + + TimerService timerService = new SimpleTimerService(internalTimerService); + + collector = new TimestampedCollector<>(output); + broadcastState = getOperatorStateBackend().getBroadcastState(broadcastStateDescriptor); + + rwContext = new ReadWriteContextImpl(getKeyedStateBackend(), userFunction, broadcastState, timerService); + rContext = new ReadOnlyContextImpl(userFunction, broadcastState, timerService); + onTimerContext = new OnTimerContextImpl(userFunction, broadcastState, timerService); + } + + @Override + public void processElement1(StreamRecord<IN1> element) throws Exception { --- End diff -- The operators still have the mapping `broadcast -> first input`, `main input -> second input`. This should be brought in line with how the API now works.
---