[
https://issues.apache.org/jira/browse/FLINK-31272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Cranmer updated FLINK-31272:
----------------------------------
Fix Version/s: 1.15.5
(was: 1.15.4)
> Duplicate operators appear in the StreamGraph for Python DataStream API jobs
> ----------------------------------------------------------------------------
>
> Key: FLINK-31272
> URL: https://issues.apache.org/jira/browse/FLINK-31272
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.15.0
> Reporter: Dian Fu
> Assignee: Dian Fu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.17.0, 1.16.2, 1.15.5
>
>
> For the following job:
> {code}
> import argparse
> import json
> import sys
> import time
> from typing import Iterable, cast
> from pyflink.common import Types, Time, Encoder
> from pyflink.datastream import StreamExecutionEnvironment,
> ProcessWindowFunction, EmbeddedRocksDBStateBackend, \
> PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode,
> ExternalizedCheckpointCleanup
> from pyflink.datastream.connectors.file_system import FileSink,
> RollingPolicy, OutputFileConfig
> from pyflink.datastream.state import ReducingState, ReducingStateDescriptor
> from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T,
> TumblingProcessingTimeWindows, \
> ProcessingTimeTrigger
> class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger):
> def __init__(self, window_size: int):
> self._window_size = window_size
> self._count_state_descriptor = ReducingStateDescriptor(
> "count", lambda a, b: a + b, Types.LONG())
> @staticmethod
> def of(window_size: int) -> 'CountWithProcessTimeoutTrigger':
> return CountWithProcessTimeoutTrigger(window_size)
> def on_element(self,
> element: T,
> timestamp: int,
> window: TimeWindow,
> ctx: 'Trigger.TriggerContext') -> TriggerResult:
> count_state = cast(ReducingState,
> ctx.get_partitioned_state(self._count_state_descriptor))
> count_state.add(1)
> # print("element arrive:", element, "count_state:",
> count_state.get(), window.max_timestamp(),
> # ctx.get_current_watermark())
> if count_state.get() >= self._window_size: # 必须fire&purge!!!!
> print("fire element count", element, count_state.get(),
> window.max_timestamp(),
> ctx.get_current_watermark())
> count_state.clear()
> return TriggerResult.FIRE_AND_PURGE
> if timestamp >= window.end:
> count_state.clear()
> return TriggerResult.FIRE_AND_PURGE
> else:
> return TriggerResult.CONTINUE
> def on_processing_time(self,
> timestamp: int,
> window: TimeWindow,
> ctx: Trigger.TriggerContext) -> TriggerResult:
> if timestamp >= window.end:
> return TriggerResult.CONTINUE
> else:
> print("fire with process_time:", timestamp)
> count_state = cast(ReducingState,
> ctx.get_partitioned_state(self._count_state_descriptor))
> count_state.clear()
> return TriggerResult.FIRE_AND_PURGE
> def on_event_time(self,
> timestamp: int,
> window: TimeWindow,
> ctx: 'Trigger.TriggerContext') -> TriggerResult:
> return TriggerResult.CONTINUE
> def clear(self,
> window: TimeWindow,
> ctx: 'Trigger.TriggerContext') -> None:
> count_state = ctx.get_partitioned_state(self._count_state_descriptor)
> count_state.clear()
> def to_dict_map(v):
> time.sleep(1)
> dict_value = json.loads(v)
> return dict_value
> def get_group_key(value, keys):
> group_key_values = []
> for key in keys:
> one_key_value = 'null'
> if key in value:
> list_value = value[key]
> if list_value:
> one_key_value = str(list_value[0])
> group_key_values.append(one_key_value)
> group_key = '_'.join(group_key_values)
> # print("group_key=", group_key)
> return group_key
> class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str,
> TimeWindow]):
> def __init__(self, uf):
> self._user_function = uf
> def process(self,
> key: str,
> context: ProcessWindowFunction.Context[TimeWindow],
> elements: Iterable[dict]) -> Iterable[dict]:
> result_list =
> self._user_function.process_after_group_by_function(elements)
> return result_list
> if __name__ == '__main__':
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--output',
> dest='output',
> required=False,
> help='Output file to write results to.')
> argv = sys.argv[1:]
> known_args, _ = parser.parse_known_args(argv)
> output_path = known_args.output
> env = StreamExecutionEnvironment.get_execution_environment()
> # write all the data to one file
> env.set_parallelism(1)
> # process time
> env.get_config().set_auto_watermark_interval(0)
> state_backend = EmbeddedRocksDBStateBackend(True)
>
> state_backend.set_predefined_options(PredefinedOptions.FLASH_SSD_OPTIMIZED)
> env.set_state_backend(state_backend)
> config = env.get_checkpoint_config()
> #
> config.set_checkpoint_storage(FileSystemCheckpointStorage("hdfs://ha-nn-uri/tmp/checkpoint/"))
>
> config.set_checkpoint_storage(FileSystemCheckpointStorage("file:///Users/10030122/Downloads/pyflink_checkpoint/"))
> config.set_checkpointing_mode(CheckpointingMode.AT_LEAST_ONCE)
> config.set_checkpoint_interval(5000)
>
> config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
> # define the source
> data_stream1 = env.from_collection(['{"user_id": ["0"], "goods_id":
> [0,0]}',
> '{"user_id": ["1"], "goods_id":
> [1,0]}',
> '{"user_id": ["2"], "goods_id":
> [2,0]}',
> '{"user_id": ["1"], "goods_id":
> [3,0]}',
> '{"user_id": ["2"], "goods_id":
> [4,0]}',
> '{"user_id": ["1"], "goods_id":
> [5,0]}',
> '{"user_id": ["2"], "goods_id":
> [6,0]}',
> '{"user_id": ["1"], "goods_id":
> [7,0]}',
> '{"user_id": ["2"], "goods_id":
> [8,0]}',
> '{"user_id": ["1"], "goods_id":
> [9,0]}',
> '{"user_id": ["2"], "goods_id":
> [10,0]}',
> '{"user_id": ["1"], "goods_id":
> [11,0]}',
> '{"user_id": ["2"], "goods_id":
> [12,0]}',
> '{"user_id": ["1"], "goods_id":
> [13,0]}',
> '{"user_id": ["2"], "goods_id":
> [14,0]}',
> '{"user_id": ["1"], "goods_id":
> [15,0]}',
> '{"user_id": ["2"], "goods_id":
> [16,0]}',
> '{"user_id": ["1"], "goods_id":
> [17,0]}',
> '{"user_id": ["2"], "goods_id":
> [18,0]}',
> '{"user_id": ["1"], "goods_id":
> [19,0]}',
> '{"user_id": ["2"], "goods_id":
> [20,0]}',
> '{"user_id": ["1"], "goods_id":
> [21,0]}',
> '{"user_id": ["2"], "goods_id":
> [22,0]}',
> '{"user_id": ["1"], "goods_id":
> [23,0]}',
> '{"user_id": ["2"], "goods_id":
> [24,0]}',
> '{"user_id": ["1"], "goods_id":
> [25,0]}',
> '{"user_id": ["2"], "goods_id":
> [26,0]}',
> '{"user_id": ["1"], "goods_id":
> [27,0]}',
> '{"user_id": ["2"], "goods_id":
> [28,0]}',
> '{"user_id": ["1"], "goods_id":
> [29,0]}',
> '{"user_id": ["2"], "goods_id":
> [30,0]}'])
> data_stream2 = env.from_collection(['{"user_id": ["0"], "goods_id":
> [0,0]}',
> '{"user_id": ["1"], "goods_id":
> [1,0]}',
> '{"user_id": ["2"], "goods_id":
> [2,0]}',
> '{"user_id": ["1"], "goods_id":
> [3,0]}',
> '{"user_id": ["2"], "goods_id":
> [4,0]}',
> '{"user_id": ["1"], "goods_id":
> [5,0]}',
> '{"user_id": ["2"], "goods_id":
> [6,0]}',
> '{"user_id": ["1"], "goods_id":
> [7,0]}',
> '{"user_id": ["2"], "goods_id":
> [8,0]}',
> '{"user_id": ["1"], "goods_id":
> [9,0]}',
> '{"user_id": ["2"], "goods_id":
> [10,0]}',
> '{"user_id": ["1"], "goods_id":
> [11,0]}',
> '{"user_id": ["2"], "goods_id":
> [12,0]}',
> '{"user_id": ["1"], "goods_id":
> [13,0]}',
> '{"user_id": ["2"], "goods_id":
> [14,0]}',
> '{"user_id": ["1"], "goods_id":
> [15,0]}',
> '{"user_id": ["2"], "goods_id":
> [16,0]}',
> '{"user_id": ["1"], "goods_id":
> [17,0]}',
> '{"user_id": ["2"], "goods_id":
> [18,0]}',
> '{"user_id": ["1"], "goods_id":
> [19,0]}',
> '{"user_id": ["2"], "goods_id":
> [20,0]}',
> '{"user_id": ["1"], "goods_id":
> [21,0]}',
> '{"user_id": ["2"], "goods_id":
> [22,0]}',
> '{"user_id": ["1"], "goods_id":
> [23,0]}',
> '{"user_id": ["2"], "goods_id":
> [24,0]}',
> '{"user_id": ["1"], "goods_id":
> [25,0]}',
> '{"user_id": ["2"], "goods_id":
> [26,0]}',
> '{"user_id": ["1"], "goods_id":
> [27,0]}',
> '{"user_id": ["2"], "goods_id":
> [28,0]}',
> '{"user_id": ["1"], "goods_id":
> [29,0]}',
> '{"user_id": ["2"], "goods_id":
> [30,0]}'])
> # group_keys = ['user_id', 'goods_id']
> group_keys = ['user_id']
> sink_to_file_flag = True
> data_stream = data_stream1.union(data_stream2)
> # user_function = __import__("UserFunction")
> ds = data_stream.map(lambda v: to_dict_map(v)) \
> .filter(lambda v: v) \
> .map(lambda v: v) \
> .key_by(lambda v: get_group_key(v, group_keys)) \
> .window(TumblingProcessingTimeWindows.of(Time.seconds(12))) \
> .process(CountWindowProcessFunction(lambda v: v), Types.STRING())
> ds = ds.map(lambda v: v, Types.PRIMITIVE_ARRAY(Types.BYTE()))
> base_path = "/tmp/1.txt"
> encoder = Encoder.simple_string_encoder()
> file_sink_builder = FileSink.for_row_format(base_path, encoder)
> file_sink = file_sink_builder \
> .with_bucket_check_interval(1000) \
> .with_rolling_policy(RollingPolicy.on_checkpoint_rolling_policy()) \
> .with_output_file_config(
>
> OutputFileConfig.builder().with_part_prefix("pre").with_part_suffix("suf").build())
> \
> .build()
> ds.sink_to(file_sink)
> # submit for execution
> env.execute()
> {code}
> The stream graph is as following:
> {code}
> {
> "nodes" : [ {
> "id" : 1,
> "type" : "Source: Collection Source",
> "pact" : "Data Source",
> "contents" : "Source: Collection Source",
> "parallelism" : 1
> }, {
> "id" : 2,
> "type" : "Source: Collection Source",
> "pact" : "Data Source",
> "contents" : "Source: Collection Source",
> "parallelism" : 1
> }, {
> "id" : 9,
> "type" : "TumblingProcessingTimeWindows",
> "pact" : "Operator",
> "contents" : "Window(TumblingProcessingTimeWindows(12000, 0),
> ProcessingTimeTrigger, CountWindowProcessFunction)",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 15,
> "ship_strategy" : "HASH",
> "side" : "second"
> } ]
> }, {
> "id" : 10,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 9,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 15,
> "type" : "Map, Filter, Map, _stream_key_by_map_operator",
> "pact" : "Operator",
> "contents" : "Map, Filter, Map, _stream_key_by_map_operator",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 1,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> }, {
> "id" : 2,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 16,
> "type" : "TumblingProcessingTimeWindows, Map",
> "pact" : "Operator",
> "contents" : "Window(TumblingProcessingTimeWindows(12000, 0),
> ProcessingTimeTrigger, CountWindowProcessFunction)",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 15,
> "ship_strategy" : "HASH",
> "side" : "second"
> } ]
> }, {
> "id" : 18,
> "type" : "Sink: Writer",
> "pact" : "Operator",
> "contents" : "Sink: Writer",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 10,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 20,
> "type" : "Sink: Committer",
> "pact" : "Operator",
> "contents" : "Sink: Committer",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 18,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> } ]
> }
> {code}
> The plan is incorrect as we can see that TumblingProcessingTimeWindows
> appears twice.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)