[
https://issues.apache.org/jira/browse/FLINK-22124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-22124:
-----------------------------------
Labels: pull-request-available (was: )
> The job finished without any exception if error was thrown during state access
> ------------------------------------------------------------------------------
>
> Key: FLINK-22124
> URL: https://issues.apache.org/jira/browse/FLINK-22124
> Project: Flink
> Issue Type: Sub-task
> Components: API / Python
> Affects Versions: 1.13.0
> Reporter: Dian Fu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.13.0
>
>
> For the following job:
> {code}
> import logging
> from pyflink.common import WatermarkStrategy, Row
> from pyflink.common.serialization import Encoder
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import FileSink, OutputFileConfig,
> NumberSequenceSource
> from pyflink.datastream.execution_mode import RuntimeExecutionMode
> from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
> from pyflink.datastream.state import MapStateDescriptor
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(2)
> env.set_runtime_mode(RuntimeExecutionMode.BATCH)
> seq_num_source = NumberSequenceSource(1, 1000)
> file_sink = FileSink \
>
> .for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state',
> Encoder.simple_string_encoder()) \
>
> .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())
> \
> .build()
> ds = env.from_source(
> source=seq_num_source,
> watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
> source_name='file_source',
> type_info=Types.LONG())
> class MyKeyedProcessFunction(KeyedProcessFunction):
> def __init__(self):
> self.state = None
> def open(self, runtime_context: RuntimeContext):
> logging.info("open")
> state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG())
> self.state = runtime_context.get_map_state(state_desc)
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
> existing = self.state.get(value[0])
> if existing is None:
> result = value[1]
> self.state.put(value[0], result)
> elif existing <= 10:
> result = value[1] + existing
> self.state.put(value[0], result)
> else:
> result = existing
> yield result
> ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(),
> Types.LONG()])) \
> .key_by(lambda a: a[0]) \
> .process(MyKeyedProcessFunction(), Types.LONG()) \
> .sink_to(file_sink)
> env.execute('data_stream_batch_state')
> {code}
> As it will encounter KeyError in `self.state.get(value[0])` if value[0]
> doesn't exist in the state, the job finished without any error message. This
> issue should be addressed. We should make sure the error message appears in
> the log file to help users to figure out what happens.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)