HuangXingBo commented on a change in pull request #15733:
URL: https://github.com/apache/flink/pull/15733#discussion_r619737364
##########
File path: docs/content.zh/docs/dev/python/datastream/operators/overview.md
##########
@@ -87,3 +78,87 @@ data_stream = env.from_collection([1, 2, 3, 4, 5],
type_info=Types.INT())
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
```
+## Output Type
+
+Users could specify the output type information of the transformation
explicitly in Python DataStream API. If not
+specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and
the result data will be serialized using pickle serializer.
+For more details about the pickle serializer, please refer to [Pickle
Serialization]({{< ref "docs/dev/python/datastream/data_types"
>}}#pickle-serialization).
+
+The output type must usually be specified in the following scenarios.
Review comment:
```suggestion
Generally, the output type must be specified in the following scenarios.
```
##########
File path: docs/content.zh/docs/dev/python/datastream/operators/overview.md
##########
@@ -87,3 +78,87 @@ data_stream = env.from_collection([1, 2, 3, 4, 5],
type_info=Types.INT())
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
```
+## Output Type
+
+Users could specify the output type information of the transformation
explicitly in Python DataStream API. If not
+specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and
the result data will be serialized using pickle serializer.
+For more details about the pickle serializer, please refer to [Pickle
Serialization]({{< ref "docs/dev/python/datastream/data_types"
>}}#pickle-serialization).
+
+The output type must usually be specified in the following scenarios.
+
+### Convert DataStream into Table
+
+```python
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+
+def data_stream_api_demo():
+ env = StreamExecutionEnvironment.get_execution_environment()
+ t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+ t_env.execute_sql("""
+ CREATE TABLE my_source (
+ a INT,
+ b VARCHAR
+ ) WITH (
+ 'connector' = 'datagen',
+ 'number-of-rows' = '10'
+ )
+ """)
+
+ ds = t_env.to_append_stream(
+ t_env.from_path('my_source'),
+ Types.ROW([Types.INT(), Types.STRING()]))
+
+ def split(s):
+ splits = s[1].split("|")
+ for sp in splits:
+ yield s[0], sp
+
+ ds = ds.map(lambda i: (i[0] + 1, i[1])) \
+ .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
+ .key_by(lambda i: i[1]) \
+ .reduce(lambda i, j: (i[0] + j[0], i[1]))
+
+ t_env.execute_sql("""
+ CREATE TABLE my_sink (
+ a INT,
+ b VARCHAR
+ ) WITH (
+ 'connector' = 'print'
+ )
+ """)
+
+ table = t_env.from_data_stream(ds)
+ table_result = table.execute_insert("my_sink")
+
+ # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
+ # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
+ table_result.wait()
+
+
+if __name__ == '__main__':
+ data_stream_api_demo()
+```
+
+The output type must be specified for the flat_map operation in the above
example which will be used as
+the output type of the reduce operation implicitly. The reason is that
+`t_env.from_data_stream(ds)` requires the output type of `ds` must be a
composite type.
+
+### Write DataStream to Sink
+
+```python
+from pyflink.common.typeinfo import Types
+
+def split(s):
+ splits = s[1].split("|")
+ for sp in splits:
+ yield s[0], sp
+
+ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()]))
\
+ .sink_to(...)
+```
+
+The output type should usually be specified for the map operation in the above
example if the sink only accepts special kinds of data, e.g. Row, etc.
Review comment:
```suggestion
Generally, the output type should be specified for the map operation in the
above example if the sink only accepts special kinds of data, e.g. Row, etc.
```
##########
File path: docs/content.zh/docs/dev/datastream/operators/process_function.md
##########
@@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) *
1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+coalesced_time = ((ctx.timestamp() + timeout) / 1000) * 1000
Review comment:
```suggestion
coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000
```
##########
File path: docs/content/docs/dev/datastream/operators/process_function.md
##########
@@ -327,6 +422,12 @@ val coalescedTime = ((ctx.timestamp + timeout) / 1000) *
1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+coalesced_time = ((ctx.timestamp() + timeout) / 1000) * 1000
Review comment:
```suggestion
coalesced_time = ((ctx.timestamp() + timeout) // 1000) * 1000
```
##########
File path: docs/content/docs/dev/python/datastream/operators/overview.md
##########
@@ -87,3 +78,88 @@ data_stream = env.from_collection([1, 2, 3, 4, 5],
type_info=Types.INT())
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
```
+## Output Type
+
+Users could specify the output type information of the transformation
explicitly in Python DataStream API. If not
+specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and
the result data will be serialized using pickle serializer.
+For more details about the pickle serializer, please refer to [Pickle
Serialization]({{< ref "docs/dev/python/datastream/data_types"
>}}#pickle-serialization).
+
+The output type must usually be specified in the following scenarios.
+
+### Convert DataStream into Table
+
+```python
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.table import StreamTableEnvironment
+
+
+def data_stream_api_demo():
+ env = StreamExecutionEnvironment.get_execution_environment()
+ t_env = StreamTableEnvironment.create(stream_execution_environment=env)
+
+ t_env.execute_sql("""
+ CREATE TABLE my_source (
+ a INT,
+ b VARCHAR
+ ) WITH (
+ 'connector' = 'datagen',
+ 'number-of-rows' = '10'
+ )
+ """)
+
+ ds = t_env.to_append_stream(
+ t_env.from_path('my_source'),
+ Types.ROW([Types.INT(), Types.STRING()]))
+
+ def split(s):
+ splits = s[1].split("|")
+ for sp in splits:
+ yield s[0], sp
+
+ ds = ds.map(lambda i: (i[0] + 1, i[1])) \
+ .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
+ .key_by(lambda i: i[1]) \
+ .reduce(lambda i, j: (i[0] + j[0], i[1]))
+
+ t_env.execute_sql("""
+ CREATE TABLE my_sink (
+ a INT,
+ b VARCHAR
+ ) WITH (
+ 'connector' = 'print'
+ )
+ """)
+
+ table = t_env.from_data_stream(ds)
+ table_result = table.execute_insert("my_sink")
+
+ # 1)wait for job finishes and only used in local execution, otherwise, it
may happen that the script exits with the job is still running
+ # 2)should be removed when submitting the job to a remote cluster such as
YARN, standalone, K8s etc in detach mode
+ table_result.wait()
+
+
+if __name__ == '__main__':
+ data_stream_api_demo()
+```
+
+The output type must be specified for the flat_map operation in the above
example which will be used as
+the output type of the reduce operation implicitly. The reason is that
+`t_env.from_data_stream(ds)` requires the output type of `ds` must be a
composite type.
+
+### Write DataStream to Sink
+
+```python
+from pyflink.common.typeinfo import Types
+
+def split(s):
+ splits = s[1].split("|")
+ for sp in splits:
+ yield s[0], sp
+
+ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()]))
\
+ .sink_to(...)
+```
+
+The output type should usually be specified for the map operation in the above
example if the sink only accepts special kinds of data, e.g. Row, etc.
Review comment:
```suggestion
Generally, the output type should be specified for the map operation in the
above example if the sink only accepts special kinds of data, e.g. Row, etc.
```
##########
File path: docs/content.zh/docs/dev/datastream/operators/process_function.md
##########
@@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends
KeyedProcessFunction[Tuple, (String, Stri
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+import datetime
+
+from pyflink.common import Row, WatermarkStrategy
+from pyflink.common.typeinfo import Types
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+from pyflink.table import StreamTableEnvironment
+
+
+class CountWithTimeoutFunction(KeyedProcessFunction):
+
+ def __init__(self):
+ self.state = None
+
+ def open(self, runtime_context: RuntimeContext):
+ self.state = runtime_context.get_state(ValueStateDescriptor(
+ "my_state", Types.ROW([Types.STRING(), Types.LONG(),
Types.LONG()])))
+
+ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
+ # retrieve the current count
+ current = self.state.value()
+ if current is None:
+ current = Row(value.f1, 0, 0)
+
+ # update the state's count
+ current[1] += 1
+
+ # set the state's timestamp to the record's assigned event time
timestamp
+ current[2] = ctx.timestamp()
+
+ # write the state back
+ self.state.update(current)
+
+ # schedule the next timer 60 seconds from the current event time
+ ctx.timer_service().register_event_time_timer(current[2] + 60000)
+
+ def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext'):
+ # get the state for the key that scheduled the timer
+ result = self.state.value()
+
+ # check if this is an outdated timer or the latest timer
+ if timestamp == result[2] + 60000:
+ # emit the state on timeout
+ yield result[0], result[1]
+
+
+class MyTimestampAssigner(TimestampAssigner):
+
+ def __init__(self):
+ self.epoch = datetime.datetime.utcfromtimestamp(0)
+
+ def extract_timestamp(self, value, record_timestamp) -> int:
+ return (value[0] - self.epoch).total_seconds() * 1000.0
Review comment:
```suggestion
return int((value[0] - self.epoch).total_seconds() * 1000)
```
##########
File path: docs/content/docs/dev/datastream/operators/process_function.md
##########
@@ -246,6 +246,94 @@ class CountWithTimeoutFunction extends
KeyedProcessFunction[Tuple, (String, Stri
}
```
{{< /tab >}}
+{{< tab "Python" >}}
+```python
+import datetime
+
+from pyflink.common import Row, WatermarkStrategy
+from pyflink.common.typeinfo import Types
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
+from pyflink.datastream.state import ValueStateDescriptor
+from pyflink.table import StreamTableEnvironment
+
+
+class CountWithTimeoutFunction(KeyedProcessFunction):
+
+ def __init__(self):
+ self.state = None
+
+ def open(self, runtime_context: RuntimeContext):
+ self.state = runtime_context.get_state(ValueStateDescriptor(
+ "my_state", Types.ROW([Types.STRING(), Types.LONG(),
Types.LONG()])))
+
+ def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
+ # retrieve the current count
+ current = self.state.value()
+ if current is None:
+ current = Row(value.f1, 0, 0)
+
+ # update the state's count
+ current[1] += 1
+
+ # set the state's timestamp to the record's assigned event time
timestamp
+ current[2] = ctx.timestamp()
+
+ # write the state back
+ self.state.update(current)
+
+ # schedule the next timer 60 seconds from the current event time
+ ctx.timer_service().register_event_time_timer(current[2] + 60000)
+
+ def on_timer(self, timestamp: int, ctx:
'KeyedProcessFunction.OnTimerContext'):
+ # get the state for the key that scheduled the timer
+ result = self.state.value()
+
+ # check if this is an outdated timer or the latest timer
+ if timestamp == result[2] + 60000:
+ # emit the state on timeout
+ yield result[0], result[1]
+
+
+class MyTimestampAssigner(TimestampAssigner):
+
+ def __init__(self):
+ self.epoch = datetime.datetime.utcfromtimestamp(0)
+
+ def extract_timestamp(self, value, record_timestamp) -> int:
+ return (value[0] - self.epoch).total_seconds() * 1000.0
Review comment:
```suggestion
return int((value[0] - self.epoch).total_seconds() * 1000)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]