This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new db23add [FLINK-26969][python][examples] Add a few examples of window
operation in Python DataStream API
db23add is described below
commit db23add96305e163328283859fb644af72350018
Author: zhangjingcun <[email protected]>
AuthorDate: Fri Apr 1 19:51:04 2022 +0800
[FLINK-26969][python][examples] Add a few examples of window operation in
Python DataStream API
This closes #19328.
---
.../pyflink/datastream/tests/test_window.py | 2 +-
.../examples/datastream/windowing/__init__.py | 17 ++++
.../windowing/session_with_dynamic_gap_window.py | 103 +++++++++++++++++++++
.../windowing/session_with_gap_window.py | 103 +++++++++++++++++++++
.../datastream/windowing/sliding_time_window.py | 97 +++++++++++++++++++
.../datastream/windowing/tumbling_count_window.py | 82 ++++++++++++++++
.../datastream/windowing/tumbling_time_window.py | 97 +++++++++++++++++++
7 files changed, 500 insertions(+), 1 deletion(-)
diff --git a/flink-python/pyflink/datastream/tests/test_window.py
b/flink-python/pyflink/datastream/tests/test_window.py
index 72d7c0d..b8cdf7e 100644
--- a/flink-python/pyflink/datastream/tests/test_window.py
+++ b/flink-python/pyflink/datastream/tests/test_window.py
@@ -372,7 +372,7 @@ class
CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, CountW
def process(self,
key: str,
- content: ProcessWindowFunction.Context,
+ context: ProcessWindowFunction.Context,
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, len([e for e in elements]))]
diff --git a/flink-python/pyflink/examples/datastream/windowing/__init__.py
b/flink-python/pyflink/examples/datastream/windowing/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/windowing/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git
a/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
new file mode 100644
index 0000000..c2f495b
--- /dev/null
+++
b/flink-python/pyflink/examples/datastream/windowing/session_with_dynamic_gap_window.py
@@ -0,0 +1,103 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, OutputFileConfig,
RollingPolicy
+
+from pyflink.common import Types, WatermarkStrategy, Encoder
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment,
ProcessWindowFunction
+from pyflink.datastream.window import EventTimeSessionWindows, \
+ SessionWindowTimeGapExtractor, TimeWindow
+
+
+class MyTimestampAssigner(TimestampAssigner):
+ def extract_timestamp(self, value, record_timestamp) -> int:
+ return int(value[1])
+
+
+class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
+ def extract(self, element: tuple) -> int:
+ return element[1]
+
+
+class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str,
TimeWindow]):
+ def process(self,
+ key: str,
+ context: ProcessWindowFunction.Context[TimeWindow],
+ elements: Iterable[tuple]) -> Iterable[tuple]:
+ return [(key, context.window().start, context.window().end, len([e for
e in elements]))]
+
+ def clear(self, context: ProcessWindowFunction.Context) -> None:
+ pass
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=False,
+ help='Output file to write results to.')
+
+ argv = sys.argv[1:]
+ known_args, _ = parser.parse_known_args(argv)
+ output_path = known_args.output
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ # write all the data to one file
+ env.set_parallelism(1)
+
+ # define the source
+ data_stream = env.from_collection([
+ ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9),
('hi', 15)],
+ type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
+
+ # define the watermark strategy
+ watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+ .with_timestamp_assigner(MyTimestampAssigner())
+
+ ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+ .key_by(lambda x: x[0], key_type=Types.STRING()) \
+
.window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor()))
\
+ .process(CountWindowProcessFunction(),
+ Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(),
Types.INT()]))
+
+ # define the sink
+ if output_path is not None:
+ ds.sink_to(
+ sink=FileSink.for_row_format(
+ base_path=output_path,
+ encoder=Encoder.simple_string_encoder())
+ .with_output_file_config(
+ OutputFileConfig.builder()
+ .with_part_prefix("prefix")
+ .with_part_suffix(".ext")
+ .build())
+ .with_rolling_policy(RollingPolicy.default_rolling_policy())
+ .build()
+ )
+ else:
+ print("Printing result to stdout. Use --output to specify output
path.")
+ ds.print()
+
+ # submit for execution
+ env.execute()
diff --git
a/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
new file mode 100644
index 0000000..524a889
--- /dev/null
+++
b/flink-python/pyflink/examples/datastream/windowing/session_with_gap_window.py
@@ -0,0 +1,103 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, RollingPolicy,
OutputFileConfig
+
+from pyflink.common import Types, WatermarkStrategy, Time, Encoder
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment,
ProcessWindowFunction
+from pyflink.datastream.window import EventTimeSessionWindows, \
+ SessionWindowTimeGapExtractor, TimeWindow
+
+
+class MyTimestampAssigner(TimestampAssigner):
+ def extract_timestamp(self, value, record_timestamp) -> int:
+ return int(value[1])
+
+
+class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
+ def extract(self, element: tuple) -> int:
+ return element[1]
+
+
+class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str,
TimeWindow]):
+ def process(self,
+ key: str,
+ context: ProcessWindowFunction.Context[TimeWindow],
+ elements: Iterable[tuple]) -> Iterable[tuple]:
+ return [(key, context.window().start, context.window().end, len([e for
e in elements]))]
+
+ def clear(self, context: ProcessWindowFunction.Context) -> None:
+ pass
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=False,
+ help='Output file to write results to.')
+
+ argv = sys.argv[1:]
+ known_args, _ = parser.parse_known_args(argv)
+ output_path = known_args.output
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ # write all the data to one file
+ env.set_parallelism(1)
+
+ # define the source
+ data_stream = env.from_collection([
+ ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9),
('hi', 15)],
+ type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
+
+ # define the watermark strategy
+ watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+ .with_timestamp_assigner(MyTimestampAssigner())
+
+ ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+ .key_by(lambda x: x[0], key_type=Types.STRING()) \
+ .window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
+ .process(CountWindowProcessFunction(),
+ Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(),
Types.INT()]))
+
+ # define the sink
+ if output_path is not None:
+ ds.sink_to(
+ sink=FileSink.for_row_format(
+ base_path=output_path,
+ encoder=Encoder.simple_string_encoder())
+ .with_output_file_config(
+ OutputFileConfig.builder()
+ .with_part_prefix("prefix")
+ .with_part_suffix(".ext")
+ .build())
+ .with_rolling_policy(RollingPolicy.default_rolling_policy())
+ .build()
+ )
+ else:
+ print("Printing result to stdout. Use --output to specify output
path.")
+ ds.print()
+
+ # submit for execution
+ env.execute()
diff --git
a/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
new file mode 100644
index 0000000..de3343c
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/windowing/sliding_time_window.py
@@ -0,0 +1,97 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, OutputFileConfig,
RollingPolicy
+
+from pyflink.common import Types, WatermarkStrategy, Time, Encoder
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment,
ProcessWindowFunction
+from pyflink.datastream.window import SlidingEventTimeWindows, TimeWindow
+
+
+class MyTimestampAssigner(TimestampAssigner):
+ def extract_timestamp(self, value, record_timestamp) -> int:
+ return int(value[1])
+
+
+class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str,
TimeWindow]):
+ def process(self,
+ key: str,
+ context: ProcessWindowFunction.Context[TimeWindow],
+ elements: Iterable[tuple]) -> Iterable[tuple]:
+ return [(key, context.window().start, context.window().end, len([e for
e in elements]))]
+
+ def clear(self, context: ProcessWindowFunction.Context) -> None:
+ pass
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=False,
+ help='Output file to write results to.')
+
+ argv = sys.argv[1:]
+ known_args, _ = parser.parse_known_args(argv)
+ output_path = known_args.output
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ # write all the data to one file
+ env.set_parallelism(1)
+
+ # define the source
+ data_stream = env.from_collection([
+ ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8),
('hi', 9), ('hi', 15)],
+ type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
+
+ # define the watermark strategy
+ watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+ .with_timestamp_assigner(MyTimestampAssigner())
+
+ ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+ .key_by(lambda x: x[0], key_type=Types.STRING()) \
+ .window(SlidingEventTimeWindows.of(Time.milliseconds(5),
Time.milliseconds(2))) \
+ .process(CountWindowProcessFunction(),
+ Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(),
Types.INT()]))
+
+ # define the sink
+ if output_path is not None:
+ ds.sink_to(
+ sink=FileSink.for_row_format(
+ base_path=output_path,
+ encoder=Encoder.simple_string_encoder())
+ .with_output_file_config(
+ OutputFileConfig.builder()
+ .with_part_prefix("prefix")
+ .with_part_suffix(".ext")
+ .build())
+ .with_rolling_policy(RollingPolicy.default_rolling_policy())
+ .build()
+ )
+ else:
+ print("Printing result to stdout. Use --output to specify output
path.")
+ ds.print()
+
+ # submit for execution
+ env.execute()
diff --git
a/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
new file mode 100644
index 0000000..0847576
--- /dev/null
+++
b/flink-python/pyflink/examples/datastream/windowing/tumbling_count_window.py
@@ -0,0 +1,82 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, OutputFileConfig,
RollingPolicy
+
+from pyflink.common import Types, Encoder
+from pyflink.datastream import StreamExecutionEnvironment, WindowFunction
+from pyflink.datastream.window import CountWindow
+
+
+class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
+ def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
+ result = 0
+ for i in inputs:
+ result += i[0]
+ return [(key, result)]
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=False,
+ help='Output file to write results to.')
+
+ argv = sys.argv[1:]
+ known_args, _ = parser.parse_known_args(argv)
+ output_path = known_args.output
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ # write all the data to one file
+ env.set_parallelism(1)
+
+ # define the source
+ data_stream = env.from_collection([
+ (1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6,
'hello'), (6, 'hello')],
+ type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
+
+ ds = data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
+ .count_window(2) \
+ .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()]))
+
+ # define the sink
+ if output_path is not None:
+ ds.sink_to(
+ sink=FileSink.for_row_format(
+ base_path=output_path,
+ encoder=Encoder.simple_string_encoder())
+ .with_output_file_config(
+ OutputFileConfig.builder()
+ .with_part_prefix("prefix")
+ .with_part_suffix(".ext")
+ .build())
+ .with_rolling_policy(RollingPolicy.default_rolling_policy())
+ .build()
+ )
+ else:
+ print("Printing result to stdout. Use --output to specify output
path.")
+ ds.print()
+
+ # submit for execution
+ env.execute()
diff --git
a/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
new file mode 100644
index 0000000..50cdd01
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/windowing/tumbling_time_window.py
@@ -0,0 +1,97 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import sys
+
+import argparse
+from typing import Iterable
+
+from pyflink.datastream.connectors import FileSink, OutputFileConfig,
RollingPolicy
+
+from pyflink.common import Types, WatermarkStrategy, Time, Encoder
+from pyflink.common.watermark_strategy import TimestampAssigner
+from pyflink.datastream import StreamExecutionEnvironment,
ProcessWindowFunction
+from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow
+
+
+class MyTimestampAssigner(TimestampAssigner):
+ def extract_timestamp(self, value, record_timestamp) -> int:
+ return int(value[1])
+
+
+class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str,
TimeWindow]):
+ def process(self,
+ key: str,
+ context: ProcessWindowFunction.Context[TimeWindow],
+ elements: Iterable[tuple]) -> Iterable[tuple]:
+ return [(key, context.window().start, context.window().end, len([e for
e in elements]))]
+
+ def clear(self, context: ProcessWindowFunction.Context) -> None:
+ pass
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--output',
+ dest='output',
+ required=False,
+ help='Output file to write results to.')
+
+ argv = sys.argv[1:]
+ known_args, _ = parser.parse_known_args(argv)
+ output_path = known_args.output
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ # write all the data to one file
+ env.set_parallelism(1)
+
+ # define the source
+ data_stream = env.from_collection([
+ ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8),
('hi', 9), ('hi', 15)],
+ type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
+
+ # define the watermark strategy
+ watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
+ .with_timestamp_assigner(MyTimestampAssigner())
+
+ ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
+ .key_by(lambda x: x[0], key_type=Types.STRING()) \
+ .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
+ .process(CountWindowProcessFunction(),
+ Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(),
Types.INT()]))
+
+ # define the sink
+ if output_path is not None:
+ ds.sink_to(
+ sink=FileSink.for_row_format(
+ base_path=output_path,
+ encoder=Encoder.simple_string_encoder())
+ .with_output_file_config(
+ OutputFileConfig.builder()
+ .with_part_prefix("prefix")
+ .with_part_suffix(".ext")
+ .build())
+ .with_rolling_policy(RollingPolicy.default_rolling_policy())
+ .build()
+ )
+ else:
+ print("Printing result to stdout. Use --output to specify output
path.")
+ ds.print()
+
+ # submit for execution
+ env.execute()