[
https://issues.apache.org/jira/browse/FLINK-22124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu updated FLINK-22124:
----------------------------
Description:
For the following job:
{code}
import logging
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig,
NumberSequenceSource
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import MapStateDescriptor
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
seq_num_source = NumberSequenceSource(1, 1000)
file_sink = FileSink \
.for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state',
Encoder.simple_string_encoder()) \
.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())
\
.build()
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='file_source',
type_info=Types.LONG())
class MyKeyedProcessFunction(KeyedProcessFunction):
def __init__(self):
self.state = None
def open(self, runtime_context: RuntimeContext):
logging.info("open")
state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG())
self.state = runtime_context.get_map_state(state_desc)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
existing = self.state.get(value[0])
if existing is None:
result = value[1]
self.state.put(value[0], result)
elif existing <= 10:
result = value[1] + existing
self.state.put(value[0], result)
else:
result = existing
yield result
ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(),
Types.LONG()])) \
.key_by(lambda a: a[0]) \
.process(MyKeyedProcessFunction(), Types.LONG()) \
.sink_to(file_sink)
env.execute('data_stream_batch_state')
{code}
As it will encounter KeyError in `self.state.get(value[0])` if value[0] doesn't
exist in the state, the job finished without any error message. This issue
should be addressed. We should make sure the error message appears in the log
file to help users to figure out what happens.
was:
For the following job:
{code}
import logging
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig,
NumberSequenceSource
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import MapStateDescriptor
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
seq_num_source = NumberSequenceSource(1, 1000)
file_sink = FileSink \
.for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state',
Encoder.simple_string_encoder()) \
.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())
\
.build()
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='file_source',
type_info=Types.LONG())
class MyKeyedProcessFunction(KeyedProcessFunction):
def __init__(self):
self.state = None
def open(self, runtime_context: RuntimeContext):
logging.info("open")
state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG())
self.state = runtime_context.get_map_state(state_desc)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
existing = self.state.get(value[0])
if existing is None:
result = value[1]
self.state.put(value[0], result)
elif existing <= 10:
result = value[1] + existing
self.state.put(value[0], result)
else:
result = existing
yield result
ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(),
Types.LONG()])) \
.key_by(lambda a: a[0]) \
.process(MyKeyedProcessFunction(), Types.LONG()) \
.sink_to(file_sink)
env.execute('data_stream_batch_state')
{code}
As it will encounter KeyError for `self.state.get(value[0])`, the job finished
without any error message. This issue should be addressed. We should make sure
the error message appears in the log file to help users to figure out what
happens.
> The job finished without any exception if error was thrown during state access
> ------------------------------------------------------------------------------
>
> Key: FLINK-22124
> URL: https://issues.apache.org/jira/browse/FLINK-22124
> Project: Flink
> Issue Type: Sub-task
> Components: API / Python
> Affects Versions: 1.13.0
> Reporter: Dian Fu
> Priority: Major
> Fix For: 1.13.0
>
>
> For the following job:
> {code}
> import logging
> from pyflink.common import WatermarkStrategy, Row
> from pyflink.common.serialization import Encoder
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import FileSink, OutputFileConfig,
> NumberSequenceSource
> from pyflink.datastream.execution_mode import RuntimeExecutionMode
> from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
> from pyflink.datastream.state import MapStateDescriptor
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(2)
> env.set_runtime_mode(RuntimeExecutionMode.BATCH)
> seq_num_source = NumberSequenceSource(1, 1000)
> file_sink = FileSink \
>
> .for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state',
> Encoder.simple_string_encoder()) \
>
> .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build())
> \
> .build()
> ds = env.from_source(
> source=seq_num_source,
> watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
> source_name='file_source',
> type_info=Types.LONG())
> class MyKeyedProcessFunction(KeyedProcessFunction):
> def __init__(self):
> self.state = None
> def open(self, runtime_context: RuntimeContext):
> logging.info("open")
> state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG())
> self.state = runtime_context.get_map_state(state_desc)
> def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
> existing = self.state.get(value[0])
> if existing is None:
> result = value[1]
> self.state.put(value[0], result)
> elif existing <= 10:
> result = value[1] + existing
> self.state.put(value[0], result)
> else:
> result = existing
> yield result
> ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(),
> Types.LONG()])) \
> .key_by(lambda a: a[0]) \
> .process(MyKeyedProcessFunction(), Types.LONG()) \
> .sink_to(file_sink)
> env.execute('data_stream_batch_state')
> {code}
> As it will encounter KeyError in `self.state.get(value[0])` if value[0]
> doesn't exist in the state, the job finished without any error message. This
> issue should be addressed. We should make sure the error message appears in
> the log file to help users to figure out what happens.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)