[
https://issues.apache.org/jira/browse/FLINK-28920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Huang Xingbo resolved FLINK-28920.
----------------------------------
Resolution: Done
> Release Testing: Verify Python DataStream Window
> ------------------------------------------------
>
> Key: FLINK-28920
> URL: https://issues.apache.org/jira/browse/FLINK-28920
> Project: Flink
> Issue Type: Sub-task
> Components: API / Python
> Affects Versions: 1.16.0
> Reporter: Huang Xingbo
> Assignee: Luning Wang
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream window job in thread mode. For details of
> Window, you can refer to the
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/].
> {code:python}
> from typing import Tuple
> from pyflink.common import Configuration
> from pyflink.common.time import Time
> from pyflink.common.typeinfo import Types
> from pyflink.common.watermark_strategy import WatermarkStrategy,
> TimestampAssigner
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.functions import AggregateFunction
> from pyflink.datastream.window import EventTimeSessionWindows
> class SecondColumnTimestampAssigner(TimestampAssigner):
> def extract_timestamp(self, value, record_timestamp) -> int:
> return int(value[1])
> def main():
> config = Configuration()
> # thread mode
> config.set_string("python.execution-mode", "thread")
> # process mode
> # config.set_string("python.execution-mode", "process")
> env = StreamExecutionEnvironment.get_execution_environment(config)
> data_stream = env.from_collection([
> ('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a',
> 15)],
> type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type:
> DataStream
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> class MyAggregateFunction(AggregateFunction):
> def create_accumulator(self) -> Tuple[int, str]:
> return 0, ''
> def add(self, value: Tuple[str, int], accumulator: Tuple[int, str])
> -> Tuple[int, str]:
> return value[1] + accumulator[0], value[0]
> def get_result(self, accumulator: Tuple[str, int]):
> return accumulator[1], accumulator[0]
> def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
> return acc_a[0] + acc_b[0], acc_a[1]
> ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> .key_by(lambda x: x[0], key_type=Types.STRING()) \
> .window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
> .aggregate(MyAggregateFunction(),
> accumulator_type=Types.TUPLE([Types.INT(),
> Types.STRING()]),
> output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
> ds.print()
> env.execute('test_window_aggregate_accumulator_type')
> if __name__ == '__main__':
> main()
> {code}
> * run the python datastream window job and watch the result
> {code:bash}
> $ python demo.py
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)