shuiqiangchen commented on a change in pull request #13803:
URL: https://github.com/apache/flink/pull/13803#discussion_r517734385



##########
File path: 
flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
##########
@@ -16,43 +16,73 @@
 # limitations under the License.
 
################################################################################
 
-from pyflink.common.serialization import JsonRowSerializationSchema, \
-    JsonRowDeserializationSchema
+from pyflink.common.serialization import SimpleStringSchema
 from pyflink.common.typeinfo import Types
-from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.datastream.connectors import FlinkKafkaProducer, 
FlinkKafkaConsumer
+from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream.connectors import FlinkKafkaProducer
+from pyflink.datastream.functions import ProcessFunction, Collector
+from pyflink.table import StreamTableEnvironment
 
-from functions import m_flat_map, add_one
+from functions import MyKeySelector
 
 
 def python_data_stream_example():
     env = StreamExecutionEnvironment.get_execution_environment()
+    # Set the parallelism to be one to make sure that all data including fired 
timer and normal data
+    # are processed by the same worker and the collected result would be in 
order which is good for
+    # assertion.
+    env.set_parallelism(1)
+    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
+    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
 
-    source_type_info = Types.ROW([Types.STRING(), Types.INT()])

Review comment:
       Currently, users are not able to defined a TimestampAssigner and 
Watermark generator for DataStream, so, we choose to create a DataStream with 
row time and watermark strategy specified via a SQL DDL. It would be supported 
for users to defined WaterMarkStrategy in the later PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to