dianfu commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r514719949
##########
File path:
flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,91 @@
+################################################################################
Review comment:
I suggest still use the name "data_stream_job", timer is only one
aspect, in the future, we could also reuse this test case to cover the state
tests.
##########
File path:
flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_timer_job.py
##########
@@ -0,0 +1,91 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common.serialization import SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
+
+from datastream.functions import MyKeySelector
+
+
+def test_ds_timer():
Review comment:
```suggestion
def python_data_stream_example():
```
##########
File path: flink-python/pyflink/datastream/functions.py
##########
@@ -540,3 +540,152 @@ def __init__(self, sink_func: Union[str, JavaObject]):
:param sink_func: The java SinkFunction object or the full name of the
SinkFunction class.
"""
super(SinkFunction, self).__init__(sink_func)
+
+
+class ProcessFunction(Function):
+ """
+ A function that process elements of a stream.
+
+ For every element in the input stream process_element(value, ctx, out) is
invoked. This can
+ produce zero or more elements as output. Implementations can also query
the time and set timers
+ through the provided Context. For firing timers on_timer(long, ctx, out)
will be invoked. This
+ can again produce zero or more elements as output and register further
timers.
+
+ Note that access to keyed state and timers (which are also scoped to a
key) is only available if
+ the ProcessFunction is applied on a KeyedStream.
+ """
+
+ @abc.abstractmethod
+ def process_element(self, value, ctx: 'Context', out: 'Collector'):
+ """
+ Process one element from the input stream.
+
+ This function can output zero or more elements using the Collector
parameter and also update
+ internal state or set timers using the Context parameter.
+
+ :param value: The input value.
+ :param ctx: A Context that allows querying the timestamp of the
element and getting a
+ TimerService for registering timers and querying the
time. The context is only
+ valid during the invocation of this method, do not store
it.
+ :param out: The collector for returning result values.
+ """
+ pass
+
+ @abc.abstractmethod
+ def on_timer(self, timestamp, ctx: 'OnTimerContext', out: 'Collector'):
+ """
+ Called when a timer set using TimerService fires.
+
+ :param timestamp: The timestamp of the firing timer.
+ :param ctx: An OnTimerContext that allows querying the timestamp of
the firing timer,
+ querying the TimeDomain of the firing timer and getting a
TimerService for
+ registering timers and querying the time. The context is
only valid during the
+ invocation of this method, do not store it.
+ :param out: The collector for returning result values.
+ """
+ pass
+
+ class Context(abc.ABC):
+ """
+ Information available in an invocation of process_element(value, ctx,
out) or
+ on_timer(value, ctx, out).
+ """
+
+ @abc.abstractmethod
+ def timer_service(self) -> 'TimerService':
+ """
+ A Timer service for querying time and registering timers.
+ """
+ pass
+
+ class OnTimerContext(abc.ABC):
+ """
+ Information available in an invocation of on_timer(long,
OnTimerContext, Collector)
+ """
+ @abc.abstractmethod
+ def timer_service(self):
Review comment:
Isn't this method should it time_domain?
##########
File path: flink-python/pyflink/fn_execution/beam/beam_coders.py
##########
@@ -354,3 +355,29 @@ def _create_impl(self):
HIGHEST_PROTOCOL = pickle.HIGHEST_PROTOCOL
return coder_impl.CallbackCoderImpl(
lambda x: dumps(filter_data_views(x), HIGHEST_PROTOCOL),
pickle.loads)
+
+
+class BeamDataStreamStatefulMapCoder(FastCoder):
Review comment:
Is it possible to reuse BeamDataStreamStatelessFlatMapCoder? Maybe need
to refactor the name a bit.
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessFunctionOperator.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link PythonProcessFunctionOperator} is responsible for launching beam
runner which will start
+ * a python harness to execute user defined python function. It is also able
to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+@Internal
+public class PythonProcessFunctionOperator<OUT> extends
AbstractOneInputPythonFunctionOperator<Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+ "flink:transform:datastream_stateful_function:v1";
Review comment:
```suggestion
"flink:transform:process_function:v1";
```
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessFunctionOperator.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link PythonProcessFunctionOperator} is responsible for launching beam
runner which will start
+ * a python harness to execute user defined python function. It is also able
to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+@Internal
+public class PythonProcessFunctionOperator<OUT> extends
AbstractOneInputPythonFunctionOperator<Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+ "flink:transform:datastream_stateful_function:v1";
+
+ private static final String
DATA_STREAM_STATEFUL_PROCESS_FUNCTION_CODER_URN =
+ "flink:coder:datastream:stateful_process_function:v1";
Review comment:
```suggestion
"flink:coder:datastream:process_function:v1";
```
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonProcessFunctionOperator.java
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimerService;
+import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
+import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.functions.python.PythonEnv;
+import org.apache.flink.table.runtime.util.StreamRecordCollector;
+import org.apache.flink.types.Row;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link PythonProcessFunctionOperator} is responsible for launching beam
runner which will start
+ * a python harness to execute user defined python function. It is also able
to handle the timer and
+ * state request from the python stateful user defined function.
+ * */
+@Internal
+public class PythonProcessFunctionOperator<OUT> extends
AbstractOneInputPythonFunctionOperator<Row, OUT>
+ implements ResultTypeQueryable<OUT>, Triggerable<Row, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String DATA_STREAM_STATEFUL_PYTHON_FUNCTION_URN =
+ "flink:transform:datastream_stateful_function:v1";
+
+ private static final String
DATA_STREAM_STATEFUL_PROCESS_FUNCTION_CODER_URN =
+ "flink:coder:datastream:stateful_process_function:v1";
+
+ /**
+ * The python {@link
org.apache.flink.streaming.api.functions.ProcessFunction} to be executed.
+ */
+ private final DataStreamPythonFunctionInfo pythonFunctionInfo;
+
+ /**
+ * The TypeInformation of python worker input data.
+ */
+ private final TypeInformation runnerInputTypeInfo;
+
+ /**
+ * The TypeInformation of python worker output data.
+ */
+ private final TypeInformation runnerOutputTypeInfo;
+
+ /**
+ * The TypeInformation of output data or this operator.
+ */
+ private final TypeInformation<OUT> outputTypeInfo;
+
+ /**
+ * The TypeInformation of current key.
+ */
+ private final TypeInformation<Row> keyTypeInfo;
+
+ /**
+ * Serializer to serialize input data for python worker.
+ */
+ private transient TypeSerializer runnerInputSerializer;
+
+ /**
+ * Serializer to deserialize output data from python worker.
+ */
+ private transient TypeSerializer runnerOutputSerializer;
+
+ /**
+ * Serializer for current key.
+ */
+ private transient TypeSerializer keyTypeSerializer;
+
+ /**
+ * TimerService for current operator to register or fire timer.
+ */
+ private transient TimerService timerservice;
+
+ /**
+ * The options used to configure the Python worker process.
+ */
+ protected final Map<String, String> jobOptions;
+
+ /**
+ * Reusable InputStream used to holding the execution results to be
deserialized.
+ */
+ protected transient ByteArrayInputStreamWithPos bais;
+
+ /**
+ * InputStream Wrapper.
+ */
+ protected transient DataInputViewStreamWrapper baisWrapper;
+
+ /**
+ * Reusable OutputStream used to holding the serialized input elements.
+ */
+ protected transient ByteArrayOutputStreamWithPos baos;
+
+ /**
+ * OutputStream Wrapper.
+ */
+ protected transient DataOutputViewStreamWrapper baosWrapper;
+
+ /**
+ * The collector for collecting output data to be emitted.
+ */
+ private transient StreamRecordCollector streamRecordCollector;
+
+ /**
+ * Reusable row for normal data runner inputs.
+ */
+ private transient Row reusableInput;
+
+ /**
+ * Reusable row for timer data runner inputs.
+ */
+ private transient Row reusableTimerData;
+
+ public PythonProcessFunctionOperator(
+ Configuration config,
+ RowTypeInfo inputTypeInfo,
+ TypeInformation<OUT> outputTypeInfo,
+ DataStreamPythonFunctionInfo pythonFunctionInfo) {
+ super(config);
+ this.jobOptions = config.toMap();
+ this.pythonFunctionInfo = pythonFunctionInfo;
+ this.outputTypeInfo = outputTypeInfo;
+ this.keyTypeInfo = new RowTypeInfo(inputTypeInfo.getTypeAt(0));
+ // inputType: normal data/ timer data, timerType: proc/event
time, currentWatermark, keyData, real data
+ this.runnerInputTypeInfo = Types.ROW(Types.INT, Types.LONG,
Types.LONG, this.keyTypeInfo, inputTypeInfo);
+ this.runnerOutputTypeInfo = Types.ROW(Types.INT, Types.LONG,
this.keyTypeInfo, outputTypeInfo);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ bais = new ByteArrayInputStreamWithPos();
+ baisWrapper = new DataInputViewStreamWrapper(bais);
+
+ baos = new ByteArrayOutputStreamWithPos();
+ baosWrapper = new DataOutputViewStreamWrapper(baos);
+ keyTypeSerializer =
PythonTypeUtils.TypeInfoToSerializerConverter.typeInfoSerializerConverter(keyTypeInfo);
+ runnerInputSerializer =
PythonTypeUtils.TypeInfoToSerializerConverter
+ .typeInfoSerializerConverter(runnerInputTypeInfo);
+ runnerOutputSerializer =
PythonTypeUtils.TypeInfoToSerializerConverter
+ .typeInfoSerializerConverter(runnerOutputTypeInfo);
+
+ InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers",
+ VoidNamespaceSerializer.INSTANCE, this);
+
+ this.streamRecordCollector = new StreamRecordCollector(output);
Review comment:
```suggestion
streamRecordCollector = new StreamRecordCollector(output);
```
Nit: Just want to keep the code style consistent.
##########
File path: flink-end-to-end-tests/test-scripts/test_pyflink.sh
##########
@@ -217,58 +217,73 @@ setup_kafka_dist
start_kafka_cluster
-create_data_stream_kafka_source
+# End to end test for DataStream ProcessFunction with timer
+create_kafka_topic 1 1 timer-stream-source
+create_kafka_topic 1 1 timer-stream-sink
-create_kafka_topic 1 1 test-python-data-stream-sink
+PAYMENT_MSGS='{"createTime": "2020-10-26 10:30:13", "orderId": 1603679414,
"payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:30:26", "orderId": 1603679427, "payAmount":
30092.50657757042, "payPlatform": 0, "provinceId": 1}
+{"createTime": "2020-10-26 10:30:27", "orderId": 1603679428, "payAmount":
62644.01719293056, "payPlatform": 0, "provinceId": 6}
+{"createTime": "2020-10-26 10:30:28", "orderId": 1603679429, "payAmount":
6449.806795118451, "payPlatform": 0, "provinceId": 2}
+{"createTime": "2020-10-26 10:31:31", "orderId": 1603679492, "payAmount":
41108.36128417494, "payPlatform": 0, "provinceId": 0}
+{"createTime": "2020-10-26 10:31:32", "orderId": 1603679493, "payAmount":
64882.44233197067, "payPlatform": 0, "provinceId": 4}
+{"createTime": "2020-10-26 10:32:01", "orderId": 1603679522, "payAmount":
81648.80712644062, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:32:02", "orderId": 1603679523, "payAmount":
81861.73063103345, "payPlatform": 0, "provinceId": 4}'
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC}
+function send_msg_to_kafka {
+
+ while read line
+ do
+ send_messages_to_kafka "$line" "timer-stream-source"
+ sleep 3
+ done <<< "$1"
+}
JOB_ID=$(${FLINK_DIR}/bin/flink run \
-p 2 \
-pyfs "${FLINK_PYTHON_TEST_DIR}/python/datastream" \
-pyreq "${REQUIREMENTS_PATH}" \
-pyarch "${TEST_DATA_DIR}/venv.zip" \
-pyexec "venv.zip/.conda/bin/python" \
- -pym "data_stream_job" \
+ -pym "data_stream_timer_job" \
-j "${KAFKA_SQL_JAR}")
echo "${JOB_ID}"
JOB_ID=`echo "${JOB_ID}" | sed 's/.* //g'`
+wait_job_running ${JOB_ID}
+
+# wait 10s to ensure all tasks are up.
Review comment:
Isn't wait_job_running already making sure that all the subtasks are up?
##########
File path: flink-end-to-end-tests/test-scripts/test_pyflink.sh
##########
@@ -217,58 +217,73 @@ setup_kafka_dist
start_kafka_cluster
-create_data_stream_kafka_source
+# End to end test for DataStream ProcessFunction with timer
+create_kafka_topic 1 1 timer-stream-source
+create_kafka_topic 1 1 timer-stream-sink
-create_kafka_topic 1 1 test-python-data-stream-sink
+PAYMENT_MSGS='{"createTime": "2020-10-26 10:30:13", "orderId": 1603679414,
"payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:30:26", "orderId": 1603679427, "payAmount":
30092.50657757042, "payPlatform": 0, "provinceId": 1}
+{"createTime": "2020-10-26 10:30:27", "orderId": 1603679428, "payAmount":
62644.01719293056, "payPlatform": 0, "provinceId": 6}
+{"createTime": "2020-10-26 10:30:28", "orderId": 1603679429, "payAmount":
6449.806795118451, "payPlatform": 0, "provinceId": 2}
+{"createTime": "2020-10-26 10:31:31", "orderId": 1603679492, "payAmount":
41108.36128417494, "payPlatform": 0, "provinceId": 0}
+{"createTime": "2020-10-26 10:31:32", "orderId": 1603679493, "payAmount":
64882.44233197067, "payPlatform": 0, "provinceId": 4}
+{"createTime": "2020-10-26 10:32:01", "orderId": 1603679522, "payAmount":
81648.80712644062, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:32:02", "orderId": 1603679523, "payAmount":
81861.73063103345, "payPlatform": 0, "provinceId": 4}'
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC}
+function send_msg_to_kafka {
+
+ while read line
+ do
+ send_messages_to_kafka "$line" "timer-stream-source"
Review comment:
correct the indentation
##########
File path: flink-end-to-end-tests/test-scripts/test_pyflink.sh
##########
@@ -217,58 +217,73 @@ setup_kafka_dist
start_kafka_cluster
-create_data_stream_kafka_source
+# End to end test for DataStream ProcessFunction with timer
+create_kafka_topic 1 1 timer-stream-source
+create_kafka_topic 1 1 timer-stream-sink
-create_kafka_topic 1 1 test-python-data-stream-sink
+PAYMENT_MSGS='{"createTime": "2020-10-26 10:30:13", "orderId": 1603679414,
"payAmount": 83685.44904332698, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:30:26", "orderId": 1603679427, "payAmount":
30092.50657757042, "payPlatform": 0, "provinceId": 1}
+{"createTime": "2020-10-26 10:30:27", "orderId": 1603679428, "payAmount":
62644.01719293056, "payPlatform": 0, "provinceId": 6}
+{"createTime": "2020-10-26 10:30:28", "orderId": 1603679429, "payAmount":
6449.806795118451, "payPlatform": 0, "provinceId": 2}
+{"createTime": "2020-10-26 10:31:31", "orderId": 1603679492, "payAmount":
41108.36128417494, "payPlatform": 0, "provinceId": 0}
+{"createTime": "2020-10-26 10:31:32", "orderId": 1603679493, "payAmount":
64882.44233197067, "payPlatform": 0, "provinceId": 4}
+{"createTime": "2020-10-26 10:32:01", "orderId": 1603679522, "payAmount":
81648.80712644062, "payPlatform": 0, "provinceId": 3}
+{"createTime": "2020-10-26 10:32:02", "orderId": 1603679523, "payAmount":
81861.73063103345, "payPlatform": 0, "provinceId": 4}'
-PYFLINK_CLIENT_EXECUTABLE=${PYTHON_EXEC}
+function send_msg_to_kafka {
+
+ while read line
+ do
+ send_messages_to_kafka "$line" "timer-stream-source"
+ sleep 3
+ done <<< "$1"
+}
JOB_ID=$(${FLINK_DIR}/bin/flink run \
-p 2 \
Review comment:
The parallelism has been set to 1 and so this doesn't take effect.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]