[
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308034#comment-16308034
]
ASF GitHub Bot commented on FLINK-8345:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5230#discussion_r159224065
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+/**
+ * A function to be applied to a
+ * {@link
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream
BroadcastConnectedStream} that
+ * connects {@link
org.apache.flink.streaming.api.datastream.BroadcastStream BroadcastStream},
i.e. a stream
+ * with broadcast state, with a {@link
org.apache.flink.streaming.api.datastream.KeyedStream KeyedStream}.
+ *
+ * <p>The stream with the broadcast state can be created using the
+ * {@link
org.apache.flink.streaming.api.datastream.KeyedStream#broadcast(MapStateDescriptor)
+ * keyedStream.broadcast(MapStateDescriptor)} method.
+ *
+ * <p>The user has to implement two methods:
+ * <ol>
+ * <li>the {@link #processElementOnBroadcastSide(Object,
KeyedReadWriteContext, Collector)} which will be applied to
+ * each element in the broadcast side
+ * <li> and the {@link #processElement(Object, KeyedReadOnlyContext,
Collector)} which will be applied to the
+ * non-broadcasted/keyed side.
+ * </ol>
+ *
+ * <p>The {@code processElementOnBroadcastSide()} takes as an argument
(among others) a context that allows it to
+ * read/write to the broadcast state and also apply a transformation to
all (local) keyed states, while the
+ * {@code processElement()} has read-only access to the broadcast state,
but can read/write to the keyed state and
+ * register timers.
+ *
+ * @param <KS> The key type of the input keyed stream.
+ * @param <IN1> The input type of the broadcast side.
+ * @param <IN2> The input type of the keyed (non-broadcast) side.
+ * @param <K> The key type of the elements in the {@link
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
+ * @param <V> The value type of the elements in the {@link
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
+ * @param <OUT> The output type of the operator.
+ */
+@PublicEvolving
+public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, K, V,
OUT> extends BaseBroadcastProcessFunction<K, V> {
+
+ private static final long serialVersionUID = -2584726797564976453L;
+
+ /**
+ * This method is called for each element in the
+ * {@link org.apache.flink.streaming.api.datastream.BroadcastStream
broadcast stream}.
+ *
+ * <p>It can output zero or more elements using the {@link Collector}
parameter,
+ * query the current processing/event time, and also query and update
the internal
+ * {@link org.apache.flink.api.common.state.BroadcastState
BroadcastState}. In addition, it
+ * can register a {@link KeyedStateFunction function} to be applied to
all keyed states on
+ * the local partition. These can be done through the provided {@link
ReadWriteContext}.
+ * The context is only valid during the invocation of this method, do
not store it.
+ *
+ * @param value The stream element.
+ * @param ctx A {@link ReadWriteContext} that allows querying the
timestamp of the element,
+ * querying the current processing/event time and updating
the broadcast state.
+ * In addition, it allows the registration of a {@link
KeyedStateFunction function}
+ * to be applied to all keyed state with a given {@link
StateDescriptor} on the local partition.
+ * The context is only valid during the invocation of this
method, do not store it.
+ * @param out The collector to emit resulting elements to
+ * @throws Exception The function may throw exceptions which cause the
streaming program
+ * to fail and go into recovery.
+ */
+ public abstract void processElementOnBroadcastSide(final IN1 value,
final KeyedReadWriteContext ctx, final Collector<OUT> out) throws Exception;
--- End diff --
This could be called `processBroadcastElement()` to make it more concise.
> Iterate over keyed state on broadcast side of connect with broadcast.
> ---------------------------------------------------------------------
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
> Issue Type: New Feature
> Components: Streaming
> Affects Versions: 1.5.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)