HuangXingBo commented on a change in pull request #15149:
URL: https://github.com/apache/flink/pull/15149#discussion_r594096181
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -167,7 +171,8 @@ def get_aggregating_state(
This state is only accessible if the function is executed on a
KeyedStream.
"""
- pass
+ raise NotImplementedError(
Review comment:
ditto
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -127,7 +127,8 @@ def get_state(self, state_descriptor: ValueStateDescriptor)
-> ValueState:
elements are distributed by the Flink runtime, the system can
transparently scale out and
redistribute the state and KeyedStream.
"""
- pass
+ raise NotImplementedError(
Review comment:
I think we should refactor the `RuntimeContext` and turn it into an
abstract class. Then turn the method of `get_list` into an abstract method.
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -156,7 +159,8 @@ def get_reducing_state(self, state_descriptor:
ReducingStateDescriptor) -> Reduc
This state is only accessible if the function is executed on a
KeyedStream.
"""
- pass
+ raise NotImplementedError(
Review comment:
ditto
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -137,7 +138,8 @@ def get_list_state(self, state_descriptor:
ListStateDescriptor) -> ListState:
This state is only accessible if the function is executed on a
KeyedStream.
"""
- pass
+ raise NotImplementedError(
+ "This state is only accessible by functions executed on a
KeyedStream.")
Review comment:
ditto
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -831,3 +836,96 @@ def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext'):
invocation of this method, do not store it.
"""
pass
+
+
+class KeyedCoProcessFunction(Function):
+ """
+A function that processes elements of two keyed streams and produces a single
output one.
+
+The function will be called for every element in the input streams and can
produce zero or
+more output elements. Contrary to the :class:`CoFlatMapFunction`, this
function can also query the
+time (both event and processing) and set timers, through the provided {@link
Context}. When
+reacting to the firing of set timers the function can emit yet more elements.
+
+An example use-case for connected streams would be the application of a set of
rules that
+change over time ({@code stream A}) to the elements contained in another
stream (stream {@code
+B}). The rules contained in {@code stream A} can be stored in the state and
wait for new elements
+to arrive on {@code stream B}. Upon reception of a new element on {@code
stream B}, the function
+can now apply the previously stored rules to the element and directly emit a
result, and/or
+register a timer that will trigger an action in the future.
+ """
+
+ class Context(ABC):
Review comment:
I did not see any implementation class to inherit this abstract class.
Did I miss something?
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -146,7 +148,8 @@ def get_map_state(self, state_descriptor:
MapStateDescriptor) -> MapState:
This state is only accessible if the function is executed on a
KeyedStream.
"""
- pass
+ raise NotImplementedError(
Review comment:
ditto
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -831,3 +836,96 @@ def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext'):
invocation of this method, do not store it.
"""
pass
+
+
+class KeyedCoProcessFunction(Function):
+ """
+A function that processes elements of two keyed streams and produces a single
output one.
+
+The function will be called for every element in the input streams and can
produce zero or
+more output elements. Contrary to the :class:`CoFlatMapFunction`, this
function can also query the
+time (both event and processing) and set timers, through the provided {@link
Context}. When
+reacting to the firing of set timers the function can emit yet more elements.
+
+An example use-case for connected streams would be the application of a set of
rules that
+change over time ({@code stream A}) to the elements contained in another
stream (stream {@code
+B}). The rules contained in {@code stream A} can be stored in the state and
wait for new elements
+to arrive on {@code stream B}. Upon reception of a new element on {@code
stream B}, the function
+can now apply the previously stored rules to the element and directly emit a
result, and/or
+register a timer that will trigger an action in the future.
+ """
+
+ class Context(ABC):
+
+ @abc.abstractmethod
+ def get_current_key(self):
+ pass
+
+ @abc.abstractmethod
+ def timer_service(self) -> TimerService:
+ """
+ A Timer service for querying time and registering timers.
+ """
+ pass
+
+ @abc.abstractmethod
+ def timestamp(self) -> int:
+ """
+ Timestamp of the element currently being processed or timestamp of
a firing timer.
+
+ This might be None, for example if the time characteristic of your
program is set to
+ TimeCharacteristic.ProcessTime.
+ """
+ pass
+
+ class OnTimerContext(Context):
Review comment:
ditto. And I found that these two abstract classes are the same in
`KeyedProcessFunction`, can we extract them?
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.PROC_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_PROC_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_PROC_TIMER;
+
+/** KeyedCoProcessOperator. */
+public class PythonKeyedCoProcessOperator<OUT>
+ extends TwoInputPythonFunctionOperator<Row, Row, Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final String KEYED_CO_PROCESS_FUNCTION_URN =
+ "flink:transform:keyed_process_function:v1";
+
+ private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
+
+ /** The TypeInformation of current key. */
+ private final TypeInformation<Row> keyTypeInfo;
+
+ /** Serializer for current key. */
+ private final TypeSerializer keyTypeSerializer;
+
+ private final TypeInformation<OUT> outputTypeInfo;
+
+ /** TimerService for current operator to register or fire timer. */
+ private transient TimerService timerService;
+
+ /** Reusable row for normal data runner inputs. */
+ private transient Row reusableInput;
+
+ /** Reusable row for timer data runner inputs. */
+ private transient Row reusableTimerData;
+
+ public PythonKeyedCoProcessOperator(
+ Configuration config,
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<OUT> outputTypeInfo,
+ DataStreamPythonFunctionInfo pythonFunctionInfo) {
+ super(
+ config,
+ pythonFunctionInfo,
+ FLAT_MAP_CODER_URN,
+ constructRunnerInputTypeInfo(
+ inputTypeInfo1, inputTypeInfo2,
constructKeyTypeInfo(inputTypeInfo1)),
+ constructRunnerOutputTypeInfo(
+ outputTypeInfo, constructKeyTypeInfo(inputTypeInfo1)));
+ this.keyTypeInfo = new RowTypeInfo(((RowTypeInfo)
inputTypeInfo1).getTypeAt(0));
+ this.keyTypeSerializer =
+
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
+ keyTypeInfo);
+ this.outputTypeInfo = outputTypeInfo;
+ }
+
+ @Override
+ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+ return new BeamDataStreamPythonFunctionRunner(
+ getRuntimeContext().getTaskName(),
+ createPythonEnvironmentManager(),
+ getRunnerInputTypeInfo(),
+ getRunnerOutputTypeInfo(),
+ KEYED_CO_PROCESS_FUNCTION_URN,
+
PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+ getPythonFunctionInfo(),
+ getRuntimeContext(),
+ Collections.EMPTY_MAP,
+ keyTypeInfo),
+ getCoderUrn(),
+ getJobOptions(),
+ getFlinkMetricContainer(),
+ getKeyedStateBackend(),
+ keyTypeSerializer,
+ getContainingTask().getEnvironment().getMemoryManager(),
+ getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.PYTHON,
+ getContainingTask()
+ .getEnvironment()
+ .getTaskManagerInfo()
+ .getConfiguration(),
+ getContainingTask()
+ .getEnvironment()
+ .getUserCodeClassLoader()
+ .asClassLoader()));
+ }
+
+ @Override
+ public void open() throws Exception {
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers",
VoidNamespaceSerializer.INSTANCE, this);
+ timerService = new SimpleTimerService(internalTimerService);
+ reusableInput = new Row(5);
+ reusableTimerData = new Row(5);
+
+ this.collector = new TimestampedCollector<>(output);
+ super.open();
+ }
+
+ @Override
+ public void processElement1(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
Review comment:
put this line into `writeTimestampAndWatermark` ?
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.PROC_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_PROC_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_PROC_TIMER;
+
+/** KeyedCoProcessOperator. */
+public class PythonKeyedCoProcessOperator<OUT>
+ extends TwoInputPythonFunctionOperator<Row, Row, Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final String KEYED_CO_PROCESS_FUNCTION_URN =
+ "flink:transform:keyed_process_function:v1";
+
+ private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
+
+ /** The TypeInformation of current key. */
+ private final TypeInformation<Row> keyTypeInfo;
+
+ /** Serializer for current key. */
+ private final TypeSerializer keyTypeSerializer;
+
+ private final TypeInformation<OUT> outputTypeInfo;
+
+ /** TimerService for current operator to register or fire timer. */
+ private transient TimerService timerService;
+
+ /** Reusable row for normal data runner inputs. */
+ private transient Row reusableInput;
+
+ /** Reusable row for timer data runner inputs. */
+ private transient Row reusableTimerData;
+
+ public PythonKeyedCoProcessOperator(
+ Configuration config,
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<OUT> outputTypeInfo,
+ DataStreamPythonFunctionInfo pythonFunctionInfo) {
+ super(
+ config,
+ pythonFunctionInfo,
+ FLAT_MAP_CODER_URN,
+ constructRunnerInputTypeInfo(
+ inputTypeInfo1, inputTypeInfo2,
constructKeyTypeInfo(inputTypeInfo1)),
+ constructRunnerOutputTypeInfo(
+ outputTypeInfo, constructKeyTypeInfo(inputTypeInfo1)));
+ this.keyTypeInfo = new RowTypeInfo(((RowTypeInfo)
inputTypeInfo1).getTypeAt(0));
+ this.keyTypeSerializer =
+
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
+ keyTypeInfo);
+ this.outputTypeInfo = outputTypeInfo;
+ }
+
+ @Override
+ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+ return new BeamDataStreamPythonFunctionRunner(
+ getRuntimeContext().getTaskName(),
+ createPythonEnvironmentManager(),
+ getRunnerInputTypeInfo(),
+ getRunnerOutputTypeInfo(),
+ KEYED_CO_PROCESS_FUNCTION_URN,
+
PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+ getPythonFunctionInfo(),
+ getRuntimeContext(),
+ Collections.EMPTY_MAP,
+ keyTypeInfo),
+ getCoderUrn(),
+ getJobOptions(),
+ getFlinkMetricContainer(),
+ getKeyedStateBackend(),
+ keyTypeSerializer,
+ getContainingTask().getEnvironment().getMemoryManager(),
+ getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.PYTHON,
+ getContainingTask()
+ .getEnvironment()
+ .getTaskManagerInfo()
+ .getConfiguration(),
+ getContainingTask()
+ .getEnvironment()
+ .getUserCodeClassLoader()
+ .asClassLoader()));
+ }
+
+ @Override
+ public void open() throws Exception {
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers",
VoidNamespaceSerializer.INSTANCE, this);
+ timerService = new SimpleTimerService(internalTimerService);
+ reusableInput = new Row(5);
+ reusableTimerData = new Row(5);
+
+ this.collector = new TimestampedCollector<>(output);
+ super.open();
+ }
+
+ @Override
+ public void processElement1(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput1(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void processElement2(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput2(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
+ byte[] rawResult = resultTuple.f0;
+ int length = resultTuple.f1;
+ if (PythonOperatorUtils.endOfLastFlatMap(length, rawResult)) {
+ bufferedTimestamp.poll();
+ } else {
+ bais.setBuffer(rawResult, 0, length);
+ Row runnerOutput =
getRunnerOutputTypeSerializer().deserialize(baisWrapper);
+ if (runnerOutput.getField(0) != null) {
+ registerTimer(runnerOutput);
+ } else {
+ collector.setAbsoluteTimestamp(bufferedTimestamp.peek());
+ collector.collect((OUT) runnerOutput.getField(3));
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return outputTypeInfo;
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws
Exception {
+ bufferedTimestamp.offer(timer.getTimestamp());
+ processTimer(false, timer);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<Row, VoidNamespace> timer)
throws Exception {
+ bufferedTimestamp.offer(Long.MIN_VALUE);
+ processTimer(true, timer);
+ }
+
+ private void writeTimestampAndWatermark(
+ Row reusableRunnerInput, StreamRecord<Row> element, long
watermark) {
+ if (element.hasTimestamp()) {
+ reusableRunnerInput.setField(1, element.getTimestamp());
+ }
+ reusableRunnerInput.setField(2, timerService.currentWatermark());
+ }
+
+ private void writeInput1(
+ Row reusableRunnerInput, Row reusableUnifiedUserInput,
StreamRecord<Row> element) {
+ reusableUnifiedUserInput.setField(0, true);
+ // The input row is a tuple of key and value.
+ reusableUnifiedUserInput.setField(1, element.getValue());
+ // need to set null since it is a reuse row.
+ reusableUnifiedUserInput.setField(2, null);
+
+ reusableRunnerInput.setField(4, reusableUnifiedUserInput);
+ }
+
+ private void writeInput2(
+ Row reusableRunnerInput, Row reusableUnifiedUserInput,
StreamRecord<Row> element) {
+ reusableUnifiedUserInput.setField(0, false);
+ // need to set null since it is a reuse row.
+ reusableUnifiedUserInput.setField(1, null);
+ // The input row is a tuple of key and value.
+ reusableUnifiedUserInput.setField(2, element.getValue());
+
+ reusableRunnerInput.setField(4, reusableUnifiedUserInput);
+ }
+
+ /**
+ * It is responsible to send timer data to python worker when a registered
timer is fired. The
+ * input data is a Row containing 4 fields: TimerFlag 0 for proc time, 1
for event time;
+ * Timestamp of the fired timer; Current watermark and the key of the
timer.
+ *
+ * @param procTime Whether is it a proc time timer, otherwise event time
timer.
+ * @param timer The fired timer.
+ * @throws Exception The runnerInputSerializer might throw exception.
+ */
+ private void processTimer(boolean procTime, InternalTimer<Row,
VoidNamespace> timer)
Review comment:
I think use `TimeDomain` is better than `boolean procTime`
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.PROC_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_PROC_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_PROC_TIMER;
+
+/** KeyedCoProcessOperator. */
+public class PythonKeyedCoProcessOperator<OUT>
+ extends TwoInputPythonFunctionOperator<Row, Row, Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final String KEYED_CO_PROCESS_FUNCTION_URN =
+ "flink:transform:keyed_process_function:v1";
+
+ private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
+
+ /** The TypeInformation of current key. */
+ private final TypeInformation<Row> keyTypeInfo;
+
+ /** Serializer for current key. */
+ private final TypeSerializer keyTypeSerializer;
+
+ private final TypeInformation<OUT> outputTypeInfo;
+
+ /** TimerService for current operator to register or fire timer. */
+ private transient TimerService timerService;
+
+ /** Reusable row for normal data runner inputs. */
+ private transient Row reusableInput;
+
+ /** Reusable row for timer data runner inputs. */
+ private transient Row reusableTimerData;
+
+ public PythonKeyedCoProcessOperator(
+ Configuration config,
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<OUT> outputTypeInfo,
+ DataStreamPythonFunctionInfo pythonFunctionInfo) {
+ super(
+ config,
+ pythonFunctionInfo,
+ FLAT_MAP_CODER_URN,
+ constructRunnerInputTypeInfo(
+ inputTypeInfo1, inputTypeInfo2,
constructKeyTypeInfo(inputTypeInfo1)),
+ constructRunnerOutputTypeInfo(
+ outputTypeInfo, constructKeyTypeInfo(inputTypeInfo1)));
+ this.keyTypeInfo = new RowTypeInfo(((RowTypeInfo)
inputTypeInfo1).getTypeAt(0));
+ this.keyTypeSerializer =
+
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
+ keyTypeInfo);
+ this.outputTypeInfo = outputTypeInfo;
+ }
+
+ @Override
+ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+ return new BeamDataStreamPythonFunctionRunner(
+ getRuntimeContext().getTaskName(),
+ createPythonEnvironmentManager(),
+ getRunnerInputTypeInfo(),
+ getRunnerOutputTypeInfo(),
+ KEYED_CO_PROCESS_FUNCTION_URN,
+
PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+ getPythonFunctionInfo(),
+ getRuntimeContext(),
+ Collections.EMPTY_MAP,
+ keyTypeInfo),
+ getCoderUrn(),
+ getJobOptions(),
+ getFlinkMetricContainer(),
+ getKeyedStateBackend(),
+ keyTypeSerializer,
+ getContainingTask().getEnvironment().getMemoryManager(),
+ getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.PYTHON,
+ getContainingTask()
+ .getEnvironment()
+ .getTaskManagerInfo()
+ .getConfiguration(),
+ getContainingTask()
+ .getEnvironment()
+ .getUserCodeClassLoader()
+ .asClassLoader()));
+ }
+
+ @Override
+ public void open() throws Exception {
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers",
VoidNamespaceSerializer.INSTANCE, this);
+ timerService = new SimpleTimerService(internalTimerService);
+ reusableInput = new Row(5);
+ reusableTimerData = new Row(5);
+
+ this.collector = new TimestampedCollector<>(output);
+ super.open();
+ }
+
+ @Override
+ public void processElement1(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput1(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void processElement2(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput2(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
+ byte[] rawResult = resultTuple.f0;
+ int length = resultTuple.f1;
+ if (PythonOperatorUtils.endOfLastFlatMap(length, rawResult)) {
+ bufferedTimestamp.poll();
+ } else {
+ bais.setBuffer(rawResult, 0, length);
+ Row runnerOutput =
getRunnerOutputTypeSerializer().deserialize(baisWrapper);
+ if (runnerOutput.getField(0) != null) {
+ registerTimer(runnerOutput);
+ } else {
+ collector.setAbsoluteTimestamp(bufferedTimestamp.peek());
+ collector.collect((OUT) runnerOutput.getField(3));
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return outputTypeInfo;
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws
Exception {
+ bufferedTimestamp.offer(timer.getTimestamp());
+ processTimer(false, timer);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<Row, VoidNamespace> timer)
throws Exception {
+ bufferedTimestamp.offer(Long.MIN_VALUE);
+ processTimer(true, timer);
+ }
+
+ private void writeTimestampAndWatermark(
+ Row reusableRunnerInput, StreamRecord<Row> element, long
watermark) {
+ if (element.hasTimestamp()) {
+ reusableRunnerInput.setField(1, element.getTimestamp());
+ }
+ reusableRunnerInput.setField(2, timerService.currentWatermark());
+ }
+
+ private void writeInput1(
+ Row reusableRunnerInput, Row reusableUnifiedUserInput,
StreamRecord<Row> element) {
+ reusableUnifiedUserInput.setField(0, true);
+ // The input row is a tuple of key and value.
+ reusableUnifiedUserInput.setField(1, element.getValue());
+ // need to set null since it is a reuse row.
+ reusableUnifiedUserInput.setField(2, null);
+
+ reusableRunnerInput.setField(4, reusableUnifiedUserInput);
+ }
+
+ private void writeInput2(
+ Row reusableRunnerInput, Row reusableUnifiedUserInput,
StreamRecord<Row> element) {
+ reusableUnifiedUserInput.setField(0, false);
+ // need to set null since it is a reuse row.
+ reusableUnifiedUserInput.setField(1, null);
+ // The input row is a tuple of key and value.
+ reusableUnifiedUserInput.setField(2, element.getValue());
+
+ reusableRunnerInput.setField(4, reusableUnifiedUserInput);
+ }
+
+ /**
+ * It is responsible to send timer data to python worker when a registered
timer is fired. The
+ * input data is a Row containing 4 fields: TimerFlag 0 for proc time, 1
for event time;
+ * Timestamp of the fired timer; Current watermark and the key of the
timer.
+ *
+ * @param procTime Whether is it a proc time timer, otherwise event time
timer.
+ * @param timer The fired timer.
+ * @throws Exception The runnerInputSerializer might throw exception.
+ */
+ private void processTimer(boolean procTime, InternalTimer<Row,
VoidNamespace> timer)
+ throws Exception {
+ long time = timer.getTimestamp();
+ Row timerKey = Row.of(timer.getKey());
+ if (procTime) {
+ reusableTimerData.setField(0, PROC_TIME_TIMER.value);
+ } else {
+ reusableTimerData.setField(0, EVENT_TIME_TIMER.value);
+ }
+ reusableTimerData.setField(1, time);
+ reusableTimerData.setField(2, timerService.currentWatermark());
+ reusableTimerData.setField(3, timerKey);
+ getRunnerInputTypeSerializer().serialize(reusableTimerData,
baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ /**
+ * Handler the timer registration request from python user defined
function. Before registering
+ * the timer, we must set the current key to be the key when the timer is
register in python
+ * side.
+ *
+ * @param runnerOutput The timer registration request data.
+ */
+ private void registerTimer(Row runnerOutput) {
+ synchronized (getKeyedStateBackend()) {
+ byte type = (byte) runnerOutput.getField(0);
+ long time = (long) runnerOutput.getField(1);
+ Object timerKey = ((Row) (runnerOutput.getField(2))).getField(0);
+ setCurrentKey(timerKey);
+ if (type == REGISTER_EVENT_TIMER.value) {
+ this.timerService.registerEventTimeTimer(time);
+ } else if (type == REGISTER_PROC_TIMER.value) {
+ this.timerService.registerProcessingTimeTimer(time);
+ } else if (type == DEL_EVENT_TIMER.value) {
+ this.timerService.deleteEventTimeTimer(time);
+ } else if (type == DEL_PROC_TIMER.value) {
+ this.timerService.deleteProcessingTimeTimer(time);
+ }
+ }
+ }
+
+ private static TypeInformation<Row>
constructKeyTypeInfo(TypeInformation<Row> inputTypeInfo) {
+ return new RowTypeInfo(((RowTypeInfo) inputTypeInfo).getTypeAt(0));
+ }
+
+ private static TypeInformation<Row> constructRunnerInputTypeInfo(
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<Row> keyTypeInfo) {
+ // structure: [isLeftUserInput, leftInput, rightInput]
+ RowTypeInfo unifiedInputTypeInfo =
+ new RowTypeInfo(Types.BOOLEAN, inputTypeInfo1, inputTypeInfo2);
+
+ // structure: [isTimerTrigger, timestamp, currentWatermark, key,
userInput]
+ return Types.ROW(Types.BYTE, Types.LONG, Types.LONG, keyTypeInfo,
unifiedInputTypeInfo);
+ }
+
+ private static TypeInformation<Row> constructRunnerOutputTypeInfo(
+ TypeInformation<?> outputTypeInfo, TypeInformation<Row>
keyTypeInfo) {
+ // structure: [isTimerRegistration, timestamp, key, userOutput]
Review comment:
The fist field has four values `register event timer`, `register pro
timer`, `del event timer` and `del pro timer`, so using `isTimerRegistration`
is not accurate
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.PROC_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_PROC_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_PROC_TIMER;
+
+/** KeyedCoProcessOperator. */
+public class PythonKeyedCoProcessOperator<OUT>
+ extends TwoInputPythonFunctionOperator<Row, Row, Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final String KEYED_CO_PROCESS_FUNCTION_URN =
+ "flink:transform:keyed_process_function:v1";
+
+ private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
+
+ /** The TypeInformation of current key. */
+ private final TypeInformation<Row> keyTypeInfo;
+
+ /** Serializer for current key. */
+ private final TypeSerializer keyTypeSerializer;
+
+ private final TypeInformation<OUT> outputTypeInfo;
+
+ /** TimerService for current operator to register or fire timer. */
+ private transient TimerService timerService;
+
+ /** Reusable row for normal data runner inputs. */
+ private transient Row reusableInput;
+
+ /** Reusable row for timer data runner inputs. */
+ private transient Row reusableTimerData;
+
+ public PythonKeyedCoProcessOperator(
+ Configuration config,
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<OUT> outputTypeInfo,
+ DataStreamPythonFunctionInfo pythonFunctionInfo) {
+ super(
+ config,
+ pythonFunctionInfo,
+ FLAT_MAP_CODER_URN,
+ constructRunnerInputTypeInfo(
+ inputTypeInfo1, inputTypeInfo2,
constructKeyTypeInfo(inputTypeInfo1)),
+ constructRunnerOutputTypeInfo(
+ outputTypeInfo, constructKeyTypeInfo(inputTypeInfo1)));
+ this.keyTypeInfo = new RowTypeInfo(((RowTypeInfo)
inputTypeInfo1).getTypeAt(0));
+ this.keyTypeSerializer =
+
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
+ keyTypeInfo);
+ this.outputTypeInfo = outputTypeInfo;
+ }
+
+ @Override
+ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+ return new BeamDataStreamPythonFunctionRunner(
+ getRuntimeContext().getTaskName(),
+ createPythonEnvironmentManager(),
+ getRunnerInputTypeInfo(),
+ getRunnerOutputTypeInfo(),
+ KEYED_CO_PROCESS_FUNCTION_URN,
+
PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+ getPythonFunctionInfo(),
+ getRuntimeContext(),
+ Collections.EMPTY_MAP,
+ keyTypeInfo),
+ getCoderUrn(),
+ getJobOptions(),
+ getFlinkMetricContainer(),
+ getKeyedStateBackend(),
+ keyTypeSerializer,
+ getContainingTask().getEnvironment().getMemoryManager(),
+ getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.PYTHON,
+ getContainingTask()
+ .getEnvironment()
+ .getTaskManagerInfo()
+ .getConfiguration(),
+ getContainingTask()
+ .getEnvironment()
+ .getUserCodeClassLoader()
+ .asClassLoader()));
+ }
+
+ @Override
+ public void open() throws Exception {
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers",
VoidNamespaceSerializer.INSTANCE, this);
+ timerService = new SimpleTimerService(internalTimerService);
+ reusableInput = new Row(5);
+ reusableTimerData = new Row(5);
+
+ this.collector = new TimestampedCollector<>(output);
+ super.open();
+ }
+
+ @Override
+ public void processElement1(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput1(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void processElement2(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput2(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
+ byte[] rawResult = resultTuple.f0;
+ int length = resultTuple.f1;
+ if (PythonOperatorUtils.endOfLastFlatMap(length, rawResult)) {
+ bufferedTimestamp.poll();
+ } else {
+ bais.setBuffer(rawResult, 0, length);
+ Row runnerOutput =
getRunnerOutputTypeSerializer().deserialize(baisWrapper);
+ if (runnerOutput.getField(0) != null) {
+ registerTimer(runnerOutput);
+ } else {
+ collector.setAbsoluteTimestamp(bufferedTimestamp.peek());
+ collector.collect((OUT) runnerOutput.getField(3));
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return outputTypeInfo;
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws
Exception {
+ bufferedTimestamp.offer(timer.getTimestamp());
+ processTimer(false, timer);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<Row, VoidNamespace> timer)
throws Exception {
+ bufferedTimestamp.offer(Long.MIN_VALUE);
+ processTimer(true, timer);
+ }
+
+ private void writeTimestampAndWatermark(
+ Row reusableRunnerInput, StreamRecord<Row> element, long
watermark) {
+ if (element.hasTimestamp()) {
+ reusableRunnerInput.setField(1, element.getTimestamp());
+ }
+ reusableRunnerInput.setField(2, timerService.currentWatermark());
+ }
+
+ private void writeInput1(
+ Row reusableRunnerInput, Row reusableUnifiedUserInput,
StreamRecord<Row> element) {
+ reusableUnifiedUserInput.setField(0, true);
+ // The input row is a tuple of key and value.
+ reusableUnifiedUserInput.setField(1, element.getValue());
+ // need to set null since it is a reuse row.
+ reusableUnifiedUserInput.setField(2, null);
+
+ reusableRunnerInput.setField(4, reusableUnifiedUserInput);
+ }
+
+ private void writeInput2(
+ Row reusableRunnerInput, Row reusableUnifiedUserInput,
StreamRecord<Row> element) {
+ reusableUnifiedUserInput.setField(0, false);
+ // need to set null since it is a reuse row.
+ reusableUnifiedUserInput.setField(1, null);
+ // The input row is a tuple of key and value.
+ reusableUnifiedUserInput.setField(2, element.getValue());
+
+ reusableRunnerInput.setField(4, reusableUnifiedUserInput);
+ }
+
+ /**
+ * It is responsible to send timer data to python worker when a registered
timer is fired. The
+ * input data is a Row containing 4 fields: TimerFlag 0 for proc time, 1
for event time;
+ * Timestamp of the fired timer; Current watermark and the key of the
timer.
+ *
+ * @param procTime Whether is it a proc time timer, otherwise event time
timer.
+ * @param timer The fired timer.
+ * @throws Exception The runnerInputSerializer might throw exception.
+ */
+ private void processTimer(boolean procTime, InternalTimer<Row,
VoidNamespace> timer)
+ throws Exception {
+ long time = timer.getTimestamp();
+ Row timerKey = Row.of(timer.getKey());
+ if (procTime) {
+ reusableTimerData.setField(0, PROC_TIME_TIMER.value);
+ } else {
+ reusableTimerData.setField(0, EVENT_TIME_TIMER.value);
+ }
+ reusableTimerData.setField(1, time);
+ reusableTimerData.setField(2, timerService.currentWatermark());
+ reusableTimerData.setField(3, timerKey);
+ getRunnerInputTypeSerializer().serialize(reusableTimerData,
baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ /**
+ * Handler the timer registration request from python user defined
function. Before registering
+ * the timer, we must set the current key to be the key when the timer is
register in python
+ * side.
+ *
+ * @param runnerOutput The timer registration request data.
+ */
+ private void registerTimer(Row runnerOutput) {
+ synchronized (getKeyedStateBackend()) {
+ byte type = (byte) runnerOutput.getField(0);
+ long time = (long) runnerOutput.getField(1);
+ Object timerKey = ((Row) (runnerOutput.getField(2))).getField(0);
+ setCurrentKey(timerKey);
+ if (type == REGISTER_EVENT_TIMER.value) {
+ this.timerService.registerEventTimeTimer(time);
+ } else if (type == REGISTER_PROC_TIMER.value) {
+ this.timerService.registerProcessingTimeTimer(time);
+ } else if (type == DEL_EVENT_TIMER.value) {
+ this.timerService.deleteEventTimeTimer(time);
+ } else if (type == DEL_PROC_TIMER.value) {
+ this.timerService.deleteProcessingTimeTimer(time);
+ }
+ }
+ }
+
+ private static TypeInformation<Row>
constructKeyTypeInfo(TypeInformation<Row> inputTypeInfo) {
+ return new RowTypeInfo(((RowTypeInfo) inputTypeInfo).getTypeAt(0));
+ }
+
+ private static TypeInformation<Row> constructRunnerInputTypeInfo(
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<Row> keyTypeInfo) {
+ // structure: [isLeftUserInput, leftInput, rightInput]
+ RowTypeInfo unifiedInputTypeInfo =
+ new RowTypeInfo(Types.BOOLEAN, inputTypeInfo1, inputTypeInfo2);
+
+ // structure: [isTimerTrigger, timestamp, currentWatermark, key,
userInput]
Review comment:
I think `TimerType` is better than `isTimerTrigger`
##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -293,52 +283,122 @@ def wrapped_process_function(value):
def extract_keyed_process_function(user_defined_function_proto, ctx,
on_timer_ctx,
collector, keyed_state_backend):
+ func_type = user_defined_function_proto.function_type
+ UserDefinedDataStreamFunction =
flink_fn_execution_pb2.UserDefinedDataStreamFunction
+ func = None
process_function = pickle.loads(user_defined_function_proto.payload)
- process_element = process_function.process_element
on_timer = process_function.on_timer
- def wrapped_keyed_process_function(value):
- if value[0] is not None:
- # it is timer data
- # VALUE: TIMER_FLAG, TIMESTAMP_OF_TIMER, CURRENT_WATERMARK,
CURRENT_KEY_OF_TIMER, None
- on_timer_ctx.set_timestamp(value[1])
- on_timer_ctx.timer_service().set_current_watermark(value[2])
- current_key = value[3]
- on_timer_ctx.set_current_key(current_key)
- keyed_state_backend.set_current_key(current_key)
- if value[0] ==
KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER.value:
- on_timer_ctx.set_time_domain(TimeDomain.EVENT_TIME)
- elif value[0] ==
KeyedProcessFunctionInputFlag.PROC_TIME_TIMER.value:
- on_timer_ctx.set_time_domain(TimeDomain.PROCESSING_TIME)
+ if func_type == UserDefinedDataStreamFunction.KEYED_PROCESS:
+ process_element = process_function.process_element
+
+ def wrapped_keyed_process_function(value):
+ if value[0] is not None:
+ # it is timer data
+ # VALUE:
+ # TIMER_FLAG, TIMESTAMP_OF_TIMER, CURRENT_WATERMARK,
CURRENT_KEY_OF_TIMER, None
+ on_timer_ctx.set_timestamp(value[1])
+ on_timer_ctx.timer_service().set_current_watermark(value[2])
+ state_current_key = value[3]
+ user_current_key = state_current_key[0]
+ on_timer_ctx.set_current_key(user_current_key)
+ keyed_state_backend.set_current_key(state_current_key)
+ if value[0] ==
KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER.value:
Review comment:
Can we uniformly use `TimerOperandType` as flag?
##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -293,52 +283,122 @@ def wrapped_process_function(value):
def extract_keyed_process_function(user_defined_function_proto, ctx,
on_timer_ctx,
collector, keyed_state_backend):
+ func_type = user_defined_function_proto.function_type
+ UserDefinedDataStreamFunction =
flink_fn_execution_pb2.UserDefinedDataStreamFunction
+ func = None
process_function = pickle.loads(user_defined_function_proto.payload)
- process_element = process_function.process_element
on_timer = process_function.on_timer
- def wrapped_keyed_process_function(value):
- if value[0] is not None:
- # it is timer data
- # VALUE: TIMER_FLAG, TIMESTAMP_OF_TIMER, CURRENT_WATERMARK,
CURRENT_KEY_OF_TIMER, None
- on_timer_ctx.set_timestamp(value[1])
- on_timer_ctx.timer_service().set_current_watermark(value[2])
- current_key = value[3]
- on_timer_ctx.set_current_key(current_key)
- keyed_state_backend.set_current_key(current_key)
- if value[0] ==
KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER.value:
- on_timer_ctx.set_time_domain(TimeDomain.EVENT_TIME)
- elif value[0] ==
KeyedProcessFunctionInputFlag.PROC_TIME_TIMER.value:
- on_timer_ctx.set_time_domain(TimeDomain.PROCESSING_TIME)
+ if func_type == UserDefinedDataStreamFunction.KEYED_PROCESS:
+ process_element = process_function.process_element
+
+ def wrapped_keyed_process_function(value):
+ if value[0] is not None:
+ # it is timer data
+ # VALUE:
+ # TIMER_FLAG, TIMESTAMP_OF_TIMER, CURRENT_WATERMARK,
CURRENT_KEY_OF_TIMER, None
+ on_timer_ctx.set_timestamp(value[1])
+ on_timer_ctx.timer_service().set_current_watermark(value[2])
+ state_current_key = value[3]
+ user_current_key = state_current_key[0]
+ on_timer_ctx.set_current_key(user_current_key)
+ keyed_state_backend.set_current_key(state_current_key)
+ if value[0] ==
KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER.value:
+ on_timer_ctx.set_time_domain(TimeDomain.EVENT_TIME)
+ elif value[0] ==
KeyedProcessFunctionInputFlag.PROC_TIME_TIMER.value:
+ on_timer_ctx.set_time_domain(TimeDomain.PROCESSING_TIME)
+ else:
+ raise TypeError("TimeCharacteristic[%s] is not supported."
% str(value[0]))
+ output_result = on_timer(value[1], on_timer_ctx)
else:
- raise TypeError("TimeCharacteristic[%s] is not supported." %
str(value[0]))
- output_result = on_timer(value[1], on_timer_ctx)
- else:
- # it is normal data
- # VALUE: TIMER_FLAG, CURRENT_TIMESTAMP, CURRENT_WATERMARK, None,
NORMAL_DATA
- # NORMAL_DATA: CURRENT_KEY, DATA
- ctx.set_timestamp(value[1])
- ctx.timer_service().set_current_watermark(value[2])
- current_key = value[4][0]
- ctx.set_current_key(current_key)
- keyed_state_backend.set_current_key(Row(current_key))
-
- output_result = process_element(value[4][1], ctx)
-
- if output_result:
- for result in output_result:
- yield Row(None, None, None, result)
-
- for result in collector.buf:
- # 0: proc time timer data
- # 1: event time timer data
- # 2: normal data
- # result_row: [TIMER_FLAG, TIMER TYPE, TIMER_KEY, RESULT_DATA]
- yield Row(result[0], result[1], result[2], None)
-
- collector.clear()
-
- return wrapped_keyed_process_function, process_function
+ # it is normal data
+ # VALUE: TIMER_FLAG, CURRENT_TIMESTAMP, CURRENT_WATERMARK,
None, NORMAL_DATA
+ # NORMAL_DATA: CURRENT_KEY, DATA
+ ctx.set_timestamp(value[1])
+ ctx.timer_service().set_current_watermark(value[2])
+ user_current_key = value[4][0]
+ state_current_key = Row(user_current_key)
+ ctx.set_current_key(user_current_key)
+ keyed_state_backend.set_current_key(state_current_key)
+
+ output_result = process_element(value[4][1], ctx)
+
+ if output_result:
+ for result in output_result:
+ yield Row(None, None, None, result)
+
+ for result in collector.buf:
+ # 0: proc time timer data
+ # 1: event time timer data
+ # 2: normal data
+ # result_row: [TIMER_FLAG, TIMER TYPE, TIMER_KEY, RESULT_DATA]
+ yield Row(result[0], result[1], result[2], None)
+
+ collector.clear()
+
+ func = wrapped_keyed_process_function
+ elif func_type == UserDefinedDataStreamFunction.KEYED_CO_PROCESS:
+ process_element1 = process_function.process_element1
+ process_element2 = process_function.process_element2
+
+ def wrapped_keyed_co_process_function(value):
+ if value[0] is not None:
+ # it is timer data
+ # VALUE:
+ # TIMER_FLAG, TIMESTAMP_OF_TIMER, CURRENT_WATERMARK,
CURRENT_KEY_OF_TIMER, None
+ on_timer_ctx.set_timestamp(value[1])
+ on_timer_ctx.timer_service().set_current_watermark(value[2])
+ state_current_key = value[3]
+ user_current_key = state_current_key[0]
+ on_timer_ctx.set_current_key(user_current_key)
+ keyed_state_backend.set_current_key(state_current_key)
+ if value[0] ==
KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER.value:
Review comment:
ditto
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.PROC_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_PROC_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_PROC_TIMER;
+
+/** KeyedCoProcessOperator. */
+public class PythonKeyedCoProcessOperator<OUT>
+ extends TwoInputPythonFunctionOperator<Row, Row, Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final String KEYED_CO_PROCESS_FUNCTION_URN =
+ "flink:transform:keyed_process_function:v1";
+
+ private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
+
+ /** The TypeInformation of current key. */
+ private final TypeInformation<Row> keyTypeInfo;
+
+ /** Serializer for current key. */
+ private final TypeSerializer keyTypeSerializer;
+
+ private final TypeInformation<OUT> outputTypeInfo;
+
+ /** TimerService for current operator to register or fire timer. */
+ private transient TimerService timerService;
+
+ /** Reusable row for normal data runner inputs. */
+ private transient Row reusableInput;
+
+ /** Reusable row for timer data runner inputs. */
+ private transient Row reusableTimerData;
+
+ public PythonKeyedCoProcessOperator(
+ Configuration config,
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<OUT> outputTypeInfo,
+ DataStreamPythonFunctionInfo pythonFunctionInfo) {
+ super(
+ config,
+ pythonFunctionInfo,
+ FLAT_MAP_CODER_URN,
+ constructRunnerInputTypeInfo(
+ inputTypeInfo1, inputTypeInfo2,
constructKeyTypeInfo(inputTypeInfo1)),
+ constructRunnerOutputTypeInfo(
+ outputTypeInfo, constructKeyTypeInfo(inputTypeInfo1)));
+ this.keyTypeInfo = new RowTypeInfo(((RowTypeInfo)
inputTypeInfo1).getTypeAt(0));
+ this.keyTypeSerializer =
+
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
+ keyTypeInfo);
+ this.outputTypeInfo = outputTypeInfo;
+ }
+
+ @Override
+ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+ return new BeamDataStreamPythonFunctionRunner(
+ getRuntimeContext().getTaskName(),
+ createPythonEnvironmentManager(),
+ getRunnerInputTypeInfo(),
+ getRunnerOutputTypeInfo(),
+ KEYED_CO_PROCESS_FUNCTION_URN,
+
PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+ getPythonFunctionInfo(),
+ getRuntimeContext(),
+ Collections.EMPTY_MAP,
+ keyTypeInfo),
+ getCoderUrn(),
+ getJobOptions(),
+ getFlinkMetricContainer(),
+ getKeyedStateBackend(),
+ keyTypeSerializer,
+ getContainingTask().getEnvironment().getMemoryManager(),
+ getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.PYTHON,
+ getContainingTask()
+ .getEnvironment()
+ .getTaskManagerInfo()
+ .getConfiguration(),
+ getContainingTask()
+ .getEnvironment()
+ .getUserCodeClassLoader()
+ .asClassLoader()));
+ }
+
+ @Override
+ public void open() throws Exception {
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers",
VoidNamespaceSerializer.INSTANCE, this);
+ timerService = new SimpleTimerService(internalTimerService);
+ reusableInput = new Row(5);
+ reusableTimerData = new Row(5);
+
+ this.collector = new TimestampedCollector<>(output);
+ super.open();
+ }
+
+ @Override
+ public void processElement1(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput1(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void processElement2(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput2(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
+ byte[] rawResult = resultTuple.f0;
+ int length = resultTuple.f1;
+ if (PythonOperatorUtils.endOfLastFlatMap(length, rawResult)) {
+ bufferedTimestamp.poll();
+ } else {
+ bais.setBuffer(rawResult, 0, length);
+ Row runnerOutput =
getRunnerOutputTypeSerializer().deserialize(baisWrapper);
+ if (runnerOutput.getField(0) != null) {
+ registerTimer(runnerOutput);
+ } else {
+ collector.setAbsoluteTimestamp(bufferedTimestamp.peek());
+ collector.collect((OUT) runnerOutput.getField(3));
Review comment:
```suggestion
collector.collect(runnerOutput.getField(3));
```
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.PROC_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_PROC_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_PROC_TIMER;
+
+/** KeyedCoProcessOperator. */
+public class PythonKeyedCoProcessOperator<OUT>
+ extends TwoInputPythonFunctionOperator<Row, Row, Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final String KEYED_CO_PROCESS_FUNCTION_URN =
+ "flink:transform:keyed_process_function:v1";
+
+ private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
+
+ /** The TypeInformation of current key. */
+ private final TypeInformation<Row> keyTypeInfo;
+
+ /** Serializer for current key. */
+ private final TypeSerializer keyTypeSerializer;
+
+ private final TypeInformation<OUT> outputTypeInfo;
+
+ /** TimerService for current operator to register or fire timer. */
+ private transient TimerService timerService;
+
+ /** Reusable row for normal data runner inputs. */
+ private transient Row reusableInput;
+
+ /** Reusable row for timer data runner inputs. */
+ private transient Row reusableTimerData;
+
+ public PythonKeyedCoProcessOperator(
+ Configuration config,
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<OUT> outputTypeInfo,
+ DataStreamPythonFunctionInfo pythonFunctionInfo) {
+ super(
+ config,
+ pythonFunctionInfo,
+ FLAT_MAP_CODER_URN,
+ constructRunnerInputTypeInfo(
+ inputTypeInfo1, inputTypeInfo2,
constructKeyTypeInfo(inputTypeInfo1)),
+ constructRunnerOutputTypeInfo(
+ outputTypeInfo, constructKeyTypeInfo(inputTypeInfo1)));
+ this.keyTypeInfo = new RowTypeInfo(((RowTypeInfo)
inputTypeInfo1).getTypeAt(0));
+ this.keyTypeSerializer =
+
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
+ keyTypeInfo);
+ this.outputTypeInfo = outputTypeInfo;
+ }
+
+ @Override
+ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+ return new BeamDataStreamPythonFunctionRunner(
+ getRuntimeContext().getTaskName(),
+ createPythonEnvironmentManager(),
+ getRunnerInputTypeInfo(),
+ getRunnerOutputTypeInfo(),
+ KEYED_CO_PROCESS_FUNCTION_URN,
+
PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+ getPythonFunctionInfo(),
+ getRuntimeContext(),
+ Collections.EMPTY_MAP,
+ keyTypeInfo),
+ getCoderUrn(),
+ getJobOptions(),
+ getFlinkMetricContainer(),
+ getKeyedStateBackend(),
+ keyTypeSerializer,
+ getContainingTask().getEnvironment().getMemoryManager(),
+ getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.PYTHON,
+ getContainingTask()
+ .getEnvironment()
+ .getTaskManagerInfo()
+ .getConfiguration(),
+ getContainingTask()
+ .getEnvironment()
+ .getUserCodeClassLoader()
+ .asClassLoader()));
+ }
+
+ @Override
+ public void open() throws Exception {
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers",
VoidNamespaceSerializer.INSTANCE, this);
+ timerService = new SimpleTimerService(internalTimerService);
+ reusableInput = new Row(5);
+ reusableTimerData = new Row(5);
+
+ this.collector = new TimestampedCollector<>(output);
+ super.open();
+ }
+
+ @Override
+ public void processElement1(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput1(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void processElement2(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput2(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void emitResult(Tuple2<byte[], Integer> resultTuple) throws
Exception {
+ byte[] rawResult = resultTuple.f0;
+ int length = resultTuple.f1;
+ if (PythonOperatorUtils.endOfLastFlatMap(length, rawResult)) {
+ bufferedTimestamp.poll();
+ } else {
+ bais.setBuffer(rawResult, 0, length);
+ Row runnerOutput =
getRunnerOutputTypeSerializer().deserialize(baisWrapper);
+ if (runnerOutput.getField(0) != null) {
+ registerTimer(runnerOutput);
+ } else {
+ collector.setAbsoluteTimestamp(bufferedTimestamp.peek());
+ collector.collect((OUT) runnerOutput.getField(3));
+ }
+ }
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return outputTypeInfo;
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<Row, VoidNamespace> timer) throws
Exception {
+ bufferedTimestamp.offer(timer.getTimestamp());
+ processTimer(false, timer);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<Row, VoidNamespace> timer)
throws Exception {
+ bufferedTimestamp.offer(Long.MIN_VALUE);
Review comment:
I think using `null` is a bit more reasonable than Long.MIN_VALUE, and
we need to change the logic in `emitResult` accordingly.
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.PROC_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_PROC_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_PROC_TIMER;
+
+/** KeyedCoProcessOperator. */
+public class PythonKeyedCoProcessOperator<OUT>
+ extends TwoInputPythonFunctionOperator<Row, Row, Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final String KEYED_CO_PROCESS_FUNCTION_URN =
+ "flink:transform:keyed_process_function:v1";
+
+ private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
+
+ /** The TypeInformation of current key. */
+ private final TypeInformation<Row> keyTypeInfo;
+
+ /** Serializer for current key. */
+ private final TypeSerializer keyTypeSerializer;
+
+ private final TypeInformation<OUT> outputTypeInfo;
+
+ /** TimerService for current operator to register or fire timer. */
+ private transient TimerService timerService;
+
+ /** Reusable row for normal data runner inputs. */
+ private transient Row reusableInput;
+
+ /** Reusable row for timer data runner inputs. */
+ private transient Row reusableTimerData;
+
+ public PythonKeyedCoProcessOperator(
+ Configuration config,
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<OUT> outputTypeInfo,
+ DataStreamPythonFunctionInfo pythonFunctionInfo) {
+ super(
+ config,
+ pythonFunctionInfo,
+ FLAT_MAP_CODER_URN,
+ constructRunnerInputTypeInfo(
+ inputTypeInfo1, inputTypeInfo2,
constructKeyTypeInfo(inputTypeInfo1)),
+ constructRunnerOutputTypeInfo(
+ outputTypeInfo, constructKeyTypeInfo(inputTypeInfo1)));
+ this.keyTypeInfo = new RowTypeInfo(((RowTypeInfo)
inputTypeInfo1).getTypeAt(0));
+ this.keyTypeSerializer =
+
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
+ keyTypeInfo);
+ this.outputTypeInfo = outputTypeInfo;
+ }
+
+ @Override
+ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
Review comment:
I think it is necessary for us to redesign the abstract method of
`createPythonFunctionRunner`. It is a bit disgusting to implement it for each
implementation class. We can create a separate JIRA to solve this problem.
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.EVENT_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionInputFlag.PROC_TIME_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.DEL_PROC_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_EVENT_TIMER;
+import static
org.apache.flink.streaming.api.utils.PythonOperatorUtils.KeyedProcessFunctionOutputFlag.REGISTER_PROC_TIMER;
+
+/** KeyedCoProcessOperator. */
+public class PythonKeyedCoProcessOperator<OUT>
+ extends TwoInputPythonFunctionOperator<Row, Row, Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final String KEYED_CO_PROCESS_FUNCTION_URN =
+ "flink:transform:keyed_process_function:v1";
+
+ private static final String FLAT_MAP_CODER_URN = "flink:coder:flat_map:v1";
+
+ /** The TypeInformation of current key. */
+ private final TypeInformation<Row> keyTypeInfo;
+
+ /** Serializer for current key. */
+ private final TypeSerializer keyTypeSerializer;
+
+ private final TypeInformation<OUT> outputTypeInfo;
+
+ /** TimerService for current operator to register or fire timer. */
+ private transient TimerService timerService;
+
+ /** Reusable row for normal data runner inputs. */
+ private transient Row reusableInput;
+
+ /** Reusable row for timer data runner inputs. */
+ private transient Row reusableTimerData;
+
+ public PythonKeyedCoProcessOperator(
+ Configuration config,
+ TypeInformation<Row> inputTypeInfo1,
+ TypeInformation<Row> inputTypeInfo2,
+ TypeInformation<OUT> outputTypeInfo,
+ DataStreamPythonFunctionInfo pythonFunctionInfo) {
+ super(
+ config,
+ pythonFunctionInfo,
+ FLAT_MAP_CODER_URN,
+ constructRunnerInputTypeInfo(
+ inputTypeInfo1, inputTypeInfo2,
constructKeyTypeInfo(inputTypeInfo1)),
+ constructRunnerOutputTypeInfo(
+ outputTypeInfo, constructKeyTypeInfo(inputTypeInfo1)));
+ this.keyTypeInfo = new RowTypeInfo(((RowTypeInfo)
inputTypeInfo1).getTypeAt(0));
+ this.keyTypeSerializer =
+
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(
+ keyTypeInfo);
+ this.outputTypeInfo = outputTypeInfo;
+ }
+
+ @Override
+ public PythonFunctionRunner createPythonFunctionRunner() throws Exception {
+ return new BeamDataStreamPythonFunctionRunner(
+ getRuntimeContext().getTaskName(),
+ createPythonEnvironmentManager(),
+ getRunnerInputTypeInfo(),
+ getRunnerOutputTypeInfo(),
+ KEYED_CO_PROCESS_FUNCTION_URN,
+
PythonOperatorUtils.getUserDefinedDataStreamStatefulFunctionProto(
+ getPythonFunctionInfo(),
+ getRuntimeContext(),
+ Collections.EMPTY_MAP,
+ keyTypeInfo),
+ getCoderUrn(),
+ getJobOptions(),
+ getFlinkMetricContainer(),
+ getKeyedStateBackend(),
+ keyTypeSerializer,
+ getContainingTask().getEnvironment().getMemoryManager(),
+ getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.PYTHON,
+ getContainingTask()
+ .getEnvironment()
+ .getTaskManagerInfo()
+ .getConfiguration(),
+ getContainingTask()
+ .getEnvironment()
+ .getUserCodeClassLoader()
+ .asClassLoader()));
+ }
+
+ @Override
+ public void open() throws Exception {
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers",
VoidNamespaceSerializer.INSTANCE, this);
+ timerService = new SimpleTimerService(internalTimerService);
+ reusableInput = new Row(5);
+ reusableTimerData = new Row(5);
+
+ this.collector = new TimestampedCollector<>(output);
+ super.open();
+ }
+
+ @Override
+ public void processElement1(StreamRecord<Row> element) throws Exception {
+ bufferedTimestamp.offer(element.getTimestamp());
+ writeTimestampAndWatermark(reusableInput, element,
timerService.currentWatermark());
+ writeInput1(reusableInput, reuseRow, element);
+
+ getRunnerInputTypeSerializer().serialize(reusableInput, baosWrapper);
+ pythonFunctionRunner.process(baos.toByteArray());
+ baos.reset();
+ elementCount++;
+ checkInvokeFinishBundleByCount();
+ emitResults();
+ }
+
+ @Override
+ public void processElement2(StreamRecord<Row> element) throws Exception {
Review comment:
I think most of the logic of `processElement1` and `processElement2` are
the same, only in the difference between `writeInput1` and `writeInput2`, we
maybe extract the same code logic into a separate method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]