dianfu commented on a change in pull request #16467:
URL: https://github.com/apache/flink/pull/16467#discussion_r686617600



##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pyx
##########
@@ -94,10 +94,13 @@ cdef class 
FlinkLengthPrefixCoderBeamWrapper(StreamCoderImpl):
     """
     def __cinit__(self, value_coder):
         self._value_coder = value_coder
+        self._output_stream = None
 
     cpdef encode_to_stream(self, value, BOutputStream out_stream, bint nested):
-        output_stream = BeamOutputStream(out_stream)
-        self._value_coder.encode_to_stream(value, output_stream)
+        if not self._output_stream:
+            self._output_stream = BeamTimeBasedOutputStream()

Review comment:
       Why not initializing **_output_stream** in the constructor?

##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -58,4 +58,25 @@ def decode_from_stream(self, in_stream: create_InputStream, 
nested):
         return self._value_coder.decode_from_stream(data_input_stream)
 
     def __repr__(self):
-        return 'FlinkCoderBeamWrapper[%s]' % self._value_coder
+        return 'FlinkFieldCoderBeamWrapper[%s]' % self._value_coder
+
+
+class FlinkLengthPrefixCoderBeamWrapper(FlinkFieldCoderBeamWrapper):
+    """
+    Bridge between Beam coder and Flink coder for the top-level 
LengthPrefixCoder.
+    """
+    def __init__(self, value_coder):
+        super(FlinkLengthPrefixCoderBeamWrapper, self).__init__(value_coder)
+        self._output_stream = None

Review comment:
       Could we give it a more meaningful name? It's difficult to differentiate 
between _output_stream and out_stream. 

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws 
Exception {
 
     protected void invokeFinishBundle() throws Exception {
         if (elementCount > 0) {
-            pythonFunctionRunner.flush();
-            elementCount = 0;
+            AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+            CountDownLatch flushThreadStart = new CountDownLatch(1);
+            AtomicReference<Exception> exceptionReference = new 
AtomicReference<>();
+            Thread flushThread =

Review comment:
       It's would be better to create a single thread pool for this, e.g. 
Executors.newSingleThreadExecutor. It could avoid creating a new thread for 
each bundle.

##########
File path: flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
##########
@@ -33,3 +34,31 @@ def read_byte(self):
 
     def size(self):
         return self._input_stream.size()
+
+
+class BeamTimeBasedOutputStream(create_OutputStream):
+    def __init__(self):
+        super(BeamTimeBasedOutputStream).__init__()
+        self._flush_event = False
+        self._periodic_flusher = PeriodicThread(1, self.notify_flush)
+        self._periodic_flusher.daemon = True
+        self._periodic_flusher.start()
+        self._output_stream = None
+
+    def write(self, b: bytes):
+        self._output_stream.write(b)
+        if self._flush_event:
+            self._output_stream.flush()
+            self._flush_event = False
+
+    def parse_output_stream(self, output_stream: create_OutputStream):
+        self._output_stream = output_stream
+
+    def notify_flush(self):
+        if not self._flush_event:

Review comment:
       ```suggestion
           if not self._flush_event:
   ```
   I guess this line could be removed

##########
File path: flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
##########
@@ -33,3 +34,31 @@ def read_byte(self):
 
     def size(self):
         return self._input_stream.size()
+
+
+class BeamTimeBasedOutputStream(create_OutputStream):
+    def __init__(self):
+        super(BeamTimeBasedOutputStream).__init__()
+        self._flush_event = False
+        self._periodic_flusher = PeriodicThread(1, self.notify_flush)
+        self._periodic_flusher.daemon = True
+        self._periodic_flusher.start()
+        self._output_stream = None
+
+    def write(self, b: bytes):
+        self._output_stream.write(b)
+        if self._flush_event:
+            self._output_stream.flush()
+            self._flush_event = False
+
+    def parse_output_stream(self, output_stream: create_OutputStream):

Review comment:
       ```suggestion
       def reset_output_stream(self, output_stream: create_OutputStream):
   ```

##########
File path: flink-python/pyflink/fn_execution/stream_fast.pyx
##########
@@ -94,6 +100,21 @@ cdef class OutputStream:
         self.buffer_size = 1024
         self.pos = 0
 
+    cpdef void write(self, bytes v):

Review comment:
       Why it's cpdef and the other methods are declared cdef?

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws 
Exception {
 
     protected void invokeFinishBundle() throws Exception {
         if (elementCount > 0) {
-            pythonFunctionRunner.flush();
-            elementCount = 0;
+            AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+            CountDownLatch flushThreadStart = new CountDownLatch(1);
+            AtomicReference<Exception> exceptionReference = new 
AtomicReference<>();
+            Thread flushThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    flushThreadStart.countDown();
+                                    pythonFunctionRunner.flush();
+                                } catch (Exception e) {
+                                    exceptionReference.set(e);
+                                } finally {
+                                    flushThreadFinish.set(true);
+                                    // interrupt the progress of takeResult in 
avoid of the main
+                                    // thread is locked forever.

Review comment:
       ```suggestion
                                       // thread is blocked forever.
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws 
Exception {
 
     protected void invokeFinishBundle() throws Exception {
         if (elementCount > 0) {
-            pythonFunctionRunner.flush();
-            elementCount = 0;
+            AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+            CountDownLatch flushThreadStart = new CountDownLatch(1);
+            AtomicReference<Exception> exceptionReference = new 
AtomicReference<>();
+            Thread flushThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    flushThreadStart.countDown();
+                                    pythonFunctionRunner.flush();
+                                } catch (Exception e) {
+                                    exceptionReference.set(e);
+                                } finally {
+                                    flushThreadFinish.set(true);
+                                    // interrupt the progress of takeResult in 
avoid of the main
+                                    // thread is locked forever.
+                                    ((BeamPythonFunctionRunner) 
pythonFunctionRunner)
+                                            .noEmptySignal();

Review comment:
       ```suggestion
                                               .notifyNoMoreResults();
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -93,6 +94,7 @@ def process(self, o: WindowedValue):
                     self._value_coder_impl.encode_to_stream(
                         self.process_element(value), output_stream, True)
                     output_stream.maybe_flush()
+            self._value_coder_impl._output_stream.close()

Review comment:
       When it's opened/started?

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws 
Exception {
 
     protected void invokeFinishBundle() throws Exception {
         if (elementCount > 0) {
-            pythonFunctionRunner.flush();
-            elementCount = 0;
+            AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+            CountDownLatch flushThreadStart = new CountDownLatch(1);
+            AtomicReference<Exception> exceptionReference = new 
AtomicReference<>();
+            Thread flushThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    flushThreadStart.countDown();
+                                    pythonFunctionRunner.flush();
+                                } catch (Exception e) {
+                                    exceptionReference.set(e);
+                                } finally {
+                                    flushThreadFinish.set(true);
+                                    // interrupt the progress of takeResult in 
avoid of the main
+                                    // thread is locked forever.
+                                    ((BeamPythonFunctionRunner) 
pythonFunctionRunner)
+                                            .noEmptySignal();
+                                }
+                            });
+            flushThread.start();
+            flushThreadStart.await();

Review comment:
       Why need to wait the flush thread started?

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws 
Exception {
 
     protected void invokeFinishBundle() throws Exception {
         if (elementCount > 0) {
-            pythonFunctionRunner.flush();
-            elementCount = 0;
+            AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+            CountDownLatch flushThreadStart = new CountDownLatch(1);
+            AtomicReference<Exception> exceptionReference = new 
AtomicReference<>();
+            Thread flushThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    flushThreadStart.countDown();
+                                    pythonFunctionRunner.flush();
+                                } catch (Exception e) {
+                                    exceptionReference.set(e);
+                                } finally {
+                                    flushThreadFinish.set(true);
+                                    // interrupt the progress of takeResult in 
avoid of the main

Review comment:
       ```suggestion
                                       // interrupt the progress of takeResult 
to avoid the main
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws 
Exception {
 
     protected void invokeFinishBundle() throws Exception {
         if (elementCount > 0) {
-            pythonFunctionRunner.flush();
-            elementCount = 0;
+            AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+            CountDownLatch flushThreadStart = new CountDownLatch(1);
+            AtomicReference<Exception> exceptionReference = new 
AtomicReference<>();
+            Thread flushThread =
+                    new Thread(
+                            () -> {
+                                try {
+                                    flushThreadStart.countDown();
+                                    pythonFunctionRunner.flush();
+                                } catch (Exception e) {

Review comment:
       ```suggestion
                                   } catch (Throwable t) {
   ```

##########
File path: flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
##########
@@ -91,12 +90,33 @@ cdef class BeamOutputStream(LengthPrefixOutputStream):
         self._output_stream.flush()
         self._output_pos = 0
 
-    cdef void _parse_output_stream(self, BOutputStream output_stream):
+    cdef void parse_output_stream(self, BOutputStream output_stream):
+        self._output_stream = output_stream
         self._output_data = output_stream.data
         self._output_pos = output_stream.pos
         self._output_buffer_size = output_stream.buffer_size
 
     cdef void _maybe_flush(self):
         if self._output_pos > 10_000_000:
-            self._output_stream.flush()
-            self._output_pos = 0
+            self.flush()
+
+cdef class BeamTimeBasedOutputStream(BeamSizeBasedOutputStream):
+    def __init__(self, *args, **kwargs):
+        self._flush_event = False
+        self._periodic_flusher = PeriodicThread(1, self.notify_flush)
+        self._periodic_flusher.daemon = True
+        self._periodic_flusher.start()
+
+    cpdef void notify_flush(self):
+        if not self._flush_event:
+            self._flush_event = True
+
+    cdef void _maybe_flush(self):
+        if self._flush_event or self._output_pos > 10_000_000:

Review comment:
       ```suggestion
           if self._flush_event or self._output_pos > 10_000_000:
   ```
   
   Why not calling the method _maybe_flush defined in the parent class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to