This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new e3d612e7e98 [FLINK-31690][python] Fix KeyedCoProcessFunction to set 
the current key into the context
e3d612e7e98 is described below

commit e3d612e7e98bedde42c365df3f2ed2a2ca76aefa
Author: Dian Fu <[email protected]>
AuthorDate: Mon Apr 3 11:57:52 2023 +0800

    [FLINK-31690][python] Fix KeyedCoProcessFunction to set the current key 
into the context
    
    This closes #22323.
---
 flink-python/pyflink/datastream/tests/test_data_stream.py        | 9 +++++----
 .../pyflink/fn_execution/datastream/process/operations.py        | 4 +++-
 2 files changed, 8 insertions(+), 5 deletions(-)

diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py 
b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 64f18f5cc83..fc08964976f 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -676,25 +676,26 @@ class DataStreamTests(object):
                 )
 
             def process_element1(self, value, ctx: 
'KeyedCoProcessFunction.Context'):
-                yield value[1]
+                yield ctx.get_current_key(), value[1]
                 self.reducing_state.add(1)
                 yield tag, self.reducing_state.get()
 
             def process_element2(self, value, ctx: 
'KeyedCoProcessFunction.Context'):
-                yield value[0]
+                yield ctx.get_current_key(), value[0]
                 self.reducing_state.add(1)
                 yield tag, self.reducing_state.get()
 
         ds3 = ds1.key_by(lambda e: e[0])\
             .connect(ds2.key_by(lambda e: e[1]))\
-            .process(MyKeyedCoProcessFunction(), output_type=Types.INT())
+            .process(MyKeyedCoProcessFunction(),
+                     output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
         main_sink = DataStreamTestSinkFunction()
         ds3.add_sink(main_sink)
         side_sink = DataStreamTestSinkFunction()
         ds3.get_side_output(tag).add_sink(side_sink)
 
         self.env.execute("test_keyed_co_process_side_output")
-        main_expected = ['1', '2', '3', '4', '5', '6', '7', '8']
+        main_expected = ['(a,1)', '(b,2)', '(a,3)', '(b,4)', '(b,5)', '(a,6)', 
'(b,7)', '(a,8)']
         self.assert_equals_sorted(main_expected, main_sink.get_results())
         side_expected = ['1', '1', '2', '2', '3', '3', '4', '4']
         self.assert_equals_sorted(side_expected, side_sink.get_results())
diff --git a/flink-python/pyflink/fn_execution/datastream/process/operations.py 
b/flink-python/pyflink/fn_execution/datastream/process/operations.py
index 13258180238..0844bb1dd3e 100644
--- a/flink-python/pyflink/fn_execution/datastream/process/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/process/operations.py
@@ -336,7 +336,9 @@ def extract_stateful_function(
                     user_input = normal_data[2]
 
                 ctx.set_timestamp(timestamp)
-                on_timer_ctx.set_current_key(user_key_selector(user_input))
+                current_user_key = user_key_selector(user_input)
+                ctx.set_current_key(current_user_key)
+                on_timer_ctx.set_current_key(current_user_key)
                 
keyed_state_backend.set_current_key(state_key_selector(user_input))
 
                 if is_left:

Reply via email to