shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r517734385
##########
File path:
flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
##########
@@ -16,43 +16,73 @@
# limitations under the License.
################################################################################
-from pyflink.common.serialization import JsonRowSerializationSchema, \
- JsonRowDeserializationSchema
+from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
-from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
-from functions import m_flat_map, add_one
+from functions import MyKeySelector
def python_data_stream_example():
env = StreamExecutionEnvironment.get_execution_environment()
+ # Set the parallelism to be one to make sure that all data including fired
timer and normal data
+ # are processed by the same worker and the collected result would be in
order which is good for
+ # assertion.
+ env.set_parallelism(1)
+ env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+ t_env = StreamTableEnvironment.create(stream_execution_environment=env)
- source_type_info = Types.ROW([Types.STRING(), Types.INT()])
Review comment:
Currently, users are not able to defined a TimestampAssigner and
Watermark generator for DataStream, so, we choose to create a DataStream with
row time and watermark strategy specified via a SQL DDL. It would be supported
for users to defined WaterMarkStrategy in the later PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]