Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5230#discussion_r159224065
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
 ---
    @@ -0,0 +1,178 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions.co;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.runtime.state.KeyedStateFunction;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.util.Collector;
    +
    +/**
    + * A function to be applied to a
    + * {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream 
BroadcastConnectedStream} that
    + * connects {@link 
org.apache.flink.streaming.api.datastream.BroadcastStream BroadcastStream}, 
i.e. a stream
    + * with broadcast state, with a {@link 
org.apache.flink.streaming.api.datastream.KeyedStream KeyedStream}.
    + *
    + * <p>The stream with the broadcast state can be created using the
    + * {@link 
org.apache.flink.streaming.api.datastream.KeyedStream#broadcast(MapStateDescriptor)
    + * keyedStream.broadcast(MapStateDescriptor)} method.
    + *
    + * <p>The user has to implement two methods:
    + * <ol>
    + *     <li>the {@link #processElementOnBroadcastSide(Object, 
KeyedReadWriteContext, Collector)} which will be applied to
    + *     each element in the broadcast side
    + *     <li> and the {@link #processElement(Object, KeyedReadOnlyContext, 
Collector)} which will be applied to the
    + *     non-broadcasted/keyed side.
    + * </ol>
    + *
    + * <p>The {@code processElementOnBroadcastSide()} takes as an argument 
(among others) a context that allows it to
    + * read/write to the broadcast state and also apply a transformation to 
all (local) keyed states, while the
    + * {@code processElement()} has read-only access to the broadcast state, 
but can read/write to the keyed state and
    + * register timers.
    + *
    + * @param <KS> The key type of the input keyed stream.
    + * @param <IN1> The input type of the broadcast side.
    + * @param <IN2> The input type of the keyed (non-broadcast) side.
    + * @param <K> The key type of the elements in the {@link 
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
    + * @param <V> The value type of the elements in the {@link 
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
    + * @param <OUT> The output type of the operator.
    + */
    +@PublicEvolving
    +public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, K, V, 
OUT> extends BaseBroadcastProcessFunction<K, V> {
    +
    +   private static final long serialVersionUID = -2584726797564976453L;
    +
    +   /**
    +    * This method is called for each element in the
    +    * {@link org.apache.flink.streaming.api.datastream.BroadcastStream 
broadcast stream}.
    +    *
    +    * <p>It can output zero or more elements using the {@link Collector} 
parameter,
    +    * query the current processing/event time, and also query and update 
the internal
    +    * {@link org.apache.flink.api.common.state.BroadcastState 
BroadcastState}. In addition, it
    +    * can register a {@link KeyedStateFunction function} to be applied to 
all keyed states on
    +    * the local partition. These can be done through the provided {@link 
ReadWriteContext}.
    +    * The context is only valid during the invocation of this method, do 
not store it.
    +    *
    +    * @param value The stream element.
    +    * @param ctx A {@link ReadWriteContext} that allows querying the 
timestamp of the element,
    +    *            querying the current processing/event time and updating 
the broadcast state.
    +    *            In addition, it allows the registration of a {@link 
KeyedStateFunction function}
    +    *            to be applied to all keyed state with a given {@link 
StateDescriptor} on the local partition.
    +    *            The context is only valid during the invocation of this 
method, do not store it.
    +    * @param out The collector to emit resulting elements to
    +    * @throws Exception The function may throw exceptions which cause the 
streaming program
    +    *                   to fail and go into recovery.
    +    */
    +   public abstract void processElementOnBroadcastSide(final IN1 value, 
final KeyedReadWriteContext ctx, final Collector<OUT> out) throws Exception;
    --- End diff --
    
    This could be called `processBroadcastElement()` to make it more concise.


---

Reply via email to