Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5230#discussion_r159224065 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.co; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.KeyedStateFunction; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.util.Collector; + +/** + * A function to be applied to a + * {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream BroadcastConnectedStream} that + * connects {@link org.apache.flink.streaming.api.datastream.BroadcastStream BroadcastStream}, i.e. a stream + * with broadcast state, with a {@link org.apache.flink.streaming.api.datastream.KeyedStream KeyedStream}. + * + * <p>The stream with the broadcast state can be created using the + * {@link org.apache.flink.streaming.api.datastream.KeyedStream#broadcast(MapStateDescriptor) + * keyedStream.broadcast(MapStateDescriptor)} method. + * + * <p>The user has to implement two methods: + * <ol> + * <li>the {@link #processElementOnBroadcastSide(Object, KeyedReadWriteContext, Collector)} which will be applied to + * each element in the broadcast side + * <li> and the {@link #processElement(Object, KeyedReadOnlyContext, Collector)} which will be applied to the + * non-broadcasted/keyed side. + * </ol> + * + * <p>The {@code processElementOnBroadcastSide()} takes as an argument (among others) a context that allows it to + * read/write to the broadcast state and also apply a transformation to all (local) keyed states, while the + * {@code processElement()} has read-only access to the broadcast state, but can read/write to the keyed state and + * register timers. + * + * @param <KS> The key type of the input keyed stream. + * @param <IN1> The input type of the broadcast side. + * @param <IN2> The input type of the keyed (non-broadcast) side. + * @param <K> The key type of the elements in the {@link org.apache.flink.api.common.state.BroadcastState BroadcastState}. + * @param <V> The value type of the elements in the {@link org.apache.flink.api.common.state.BroadcastState BroadcastState}. + * @param <OUT> The output type of the operator. + */ +@PublicEvolving +public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, K, V, OUT> extends BaseBroadcastProcessFunction<K, V> { + + private static final long serialVersionUID = -2584726797564976453L; + + /** + * This method is called for each element in the + * {@link org.apache.flink.streaming.api.datastream.BroadcastStream broadcast stream}. + * + * <p>It can output zero or more elements using the {@link Collector} parameter, + * query the current processing/event time, and also query and update the internal + * {@link org.apache.flink.api.common.state.BroadcastState BroadcastState}. In addition, it + * can register a {@link KeyedStateFunction function} to be applied to all keyed states on + * the local partition. These can be done through the provided {@link ReadWriteContext}. + * The context is only valid during the invocation of this method, do not store it. + * + * @param value The stream element. + * @param ctx A {@link ReadWriteContext} that allows querying the timestamp of the element, + * querying the current processing/event time and updating the broadcast state. + * In addition, it allows the registration of a {@link KeyedStateFunction function} + * to be applied to all keyed state with a given {@link StateDescriptor} on the local partition. + * The context is only valid during the invocation of this method, do not store it. + * @param out The collector to emit resulting elements to + * @throws Exception The function may throw exceptions which cause the streaming program + * to fail and go into recovery. + */ + public abstract void processElementOnBroadcastSide(final IN1 value, final KeyedReadWriteContext ctx, final Collector<OUT> out) throws Exception; --- End diff -- This could be called `processBroadcastElement()` to make it more concise.
---