HuangXingBo commented on a change in pull request #16777:
URL: https://github.com/apache/flink/pull/16777#discussion_r686588224
##########
File path: flink-python/pyflink/datastream/stream_execution_environment.py
##########
@@ -736,8 +736,10 @@ def execute(self, job_name: str = None) ->
JobExecutionResult:
:return: The result of the job execution, containing elapsed time and
accumulators.
"""
+ gateway = get_gateway()
Review comment:
What about moving these logic into the method `_generate_stream_graph`
##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_fast.pxd
##########
@@ -19,9 +19,33 @@
from apache_beam.coders.coder_impl cimport StreamCoderImpl
from apache_beam.runners.worker.operations cimport Operation
+from apache_beam.utils.windowed_value cimport WindowedValue
+
+from pyflink.fn_execution.coder_impl_fast cimport InputStreamWrapper
+
+cdef class InputProcessor:
+ cpdef has_next(self)
+ cpdef next(self)
+
+cdef class NetworkInputProcessor(InputProcessor):
+ cdef InputStreamWrapper _input_stream_wrapper
+
+cdef class IntermediateInputProcessor(InputProcessor):
+ cdef object _input_values
+ cdef object _next_value
+
+cdef class OutputProcessor:
+ cpdef process_outputs(self, WindowedValue windowed_value, results)
+
+cdef class NetworkOutputProcessor(OutputProcessor):
+ cdef Operation _consumer
Review comment:
move the attribute `Operation _consumer` to the parent class
`OutputProcessor` ?
##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_fast.pyx
##########
@@ -19,15 +19,85 @@
# cython: infer_types = True
# cython: profile=True
# cython: boundscheck=False, wraparound=False, initializedcheck=False,
cdivision=True
+from apache_beam.runners.worker.bundle_processor import DataOutputOperation
Review comment:
move this `import` statement to the back of these `cimport` statement
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]