HuangXingBo commented on a change in pull request #16541:
URL: https://github.com/apache/flink/pull/16541#discussion_r673733773



##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
##########
@@ -82,17 +81,17 @@ cdef class FunctionOperation(Operation):
                     self.consumer.output_stream.maybe_flush()
             else:
                 input_stream_wrapper = o.value
-                output_stream = BeamOutputStream(self.consumer.output_stream)
                 if isinstance(self.operation, BundleOperation):
                     while input_stream_wrapper.has_next():
                         self.process_element(input_stream_wrapper.next())
                     result = self.operation.finish_bundle()
-                    self._output_coder.encode_to_stream(result, output_stream)
+                    self._value_coder_impl.encode_to_stream(

Review comment:
       Add `output_stream = self.consumer.output_stream` and the type 
definition of `output_stream`

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coders.py
##########
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+

Review comment:
       unnecessary

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
##########
@@ -132,3 +131,14 @@ cdef class StatefulFunctionOperation(FunctionOperation):
 
     cdef object generate_operation(self):
         return self.operation_cls(self.spec, self.keyed_state_backend)
+
+    cpdef void add_timer_info(self, timer_family_id, timer_info):
+        # ignore timer_family_id
+        self.operation.add_timer_info(timer_info)
+
+    cpdef process_timer(self, tag, timer_data):
+        output_stream = self.consumer.output_stream

Review comment:
       add the type definition of `output_stream`

##########
File path: flink-python/pyflink/fn_execution/coder_impl_fast.pyx
##########
@@ -175,7 +175,7 @@ cdef class LengthPrefixBaseCoderImpl:
     """
 
     def __init__(self, field_coder: FieldCoderImpl):
-        self._field_coder = field_coder  # type: FieldCoderImpl
+        self._field_coder = field_coder

Review comment:
       we need to remove the `from pyflink.fn_execution import 
flink_fn_execution_pb2` although it is irrelevant to this commit.

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
##########
@@ -132,3 +131,14 @@ cdef class StatefulFunctionOperation(FunctionOperation):
 
     cdef object generate_operation(self):
         return self.operation_cls(self.spec, self.keyed_state_backend)
+
+    cpdef void add_timer_info(self, timer_family_id, timer_info):
+        # ignore timer_family_id
+        self.operation.add_timer_info(timer_info)
+
+    cpdef process_timer(self, tag, timer_data):
+        output_stream = self.consumer.output_stream
+        self._value_coder_impl.encode_to_stream(
+            # the field user_key holds the timer data
+            self.operation.process_timer(timer_data.user_key), output_stream, 
True)
+        output_stream.maybe_flush()

Review comment:
       I think we don't need to call `maybe_flush` since BeamOutputStream has 
already decide to whether flush after every `write`

##########
File path: flink-python/pyflink/fn_execution/datastream/timerservice_impl.py
##########
@@ -18,49 +18,30 @@
 import collections
 import time
 from enum import Enum
+from io import BytesIO
 
+from apache_beam.runners.worker.bundle_processor import TimerInfo
+from apache_beam.transforms import userstate
+from apache_beam.transforms.window import GlobalWindow
+
+from pyflink.common import Row
 from pyflink.datastream import TimerService
-from pyflink.fn_execution.datastream.timerservice import InternalTimer, K, N, 
InternalTimerService
+from pyflink.fn_execution.datastream.timerservice import N, 
InternalTimerService, InternalTimer, K

Review comment:
       unnecessary change?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
##########
@@ -66,6 +67,8 @@ cdef class BeamOutputStream(LengthPrefixOutputStream):
             self._output_buffer_size += length + 9
             self._output_data = <char*> realloc(self._output_data,
                                                 self._output_buffer_size)

Review comment:
       format into one line

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
##########
@@ -82,17 +81,17 @@ cdef class FunctionOperation(Operation):
                     self.consumer.output_stream.maybe_flush()
             else:
                 input_stream_wrapper = o.value
-                output_stream = BeamOutputStream(self.consumer.output_stream)

Review comment:
       remove `cdef BeamOutputStream output_stream` and `from 
pyflink.fn_execution.beam.beam_stream_fast cimport BeamOutputStream`

##########
File path: flink-python/pyflink/fn_execution/coder_impl_slow.py
##########
@@ -114,7 +114,7 @@ class ValueCoderImpl(LengthPrefixBaseCoderImpl):
     def __init__(self, field_coder: 'FieldCoderImpl'):
         super(ValueCoderImpl, self).__init__(field_coder)
 
-    def encode_to_stream(self, value, out_stream):
+    def encode_to_stream(self, value, out_stream: OutputStream):

Review comment:
       What about add the type hint to all `encode_to_stream` and 
`decode_from_stream`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to