dianfu commented on a change in pull request #16467:
URL: https://github.com/apache/flink/pull/16467#discussion_r686617600
##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_fast.pyx
##########
@@ -94,10 +94,13 @@ cdef class
FlinkLengthPrefixCoderBeamWrapper(StreamCoderImpl):
"""
def __cinit__(self, value_coder):
self._value_coder = value_coder
+ self._output_stream = None
cpdef encode_to_stream(self, value, BOutputStream out_stream, bint nested):
- output_stream = BeamOutputStream(out_stream)
- self._value_coder.encode_to_stream(value, output_stream)
+ if not self._output_stream:
+ self._output_stream = BeamTimeBasedOutputStream()
Review comment:
Why not initializing **_output_stream** in the constructor?
##########
File path: flink-python/pyflink/fn_execution/beam/beam_coder_impl_slow.py
##########
@@ -58,4 +58,25 @@ def decode_from_stream(self, in_stream: create_InputStream,
nested):
return self._value_coder.decode_from_stream(data_input_stream)
def __repr__(self):
- return 'FlinkCoderBeamWrapper[%s]' % self._value_coder
+ return 'FlinkFieldCoderBeamWrapper[%s]' % self._value_coder
+
+
+class FlinkLengthPrefixCoderBeamWrapper(FlinkFieldCoderBeamWrapper):
+ """
+ Bridge between Beam coder and Flink coder for the top-level
LengthPrefixCoder.
+ """
+ def __init__(self, value_coder):
+ super(FlinkLengthPrefixCoderBeamWrapper, self).__init__(value_coder)
+ self._output_stream = None
Review comment:
Could we give it a more meaningful name? It's difficult to differentiate
between _output_stream and out_stream.
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws
Exception {
protected void invokeFinishBundle() throws Exception {
if (elementCount > 0) {
- pythonFunctionRunner.flush();
- elementCount = 0;
+ AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+ CountDownLatch flushThreadStart = new CountDownLatch(1);
+ AtomicReference<Exception> exceptionReference = new
AtomicReference<>();
+ Thread flushThread =
Review comment:
It's would be better to create a single thread pool for this, e.g.
Executors.newSingleThreadExecutor. It could avoid creating a new thread for
each bundle.
##########
File path: flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
##########
@@ -33,3 +34,31 @@ def read_byte(self):
def size(self):
return self._input_stream.size()
+
+
+class BeamTimeBasedOutputStream(create_OutputStream):
+ def __init__(self):
+ super(BeamTimeBasedOutputStream).__init__()
+ self._flush_event = False
+ self._periodic_flusher = PeriodicThread(1, self.notify_flush)
+ self._periodic_flusher.daemon = True
+ self._periodic_flusher.start()
+ self._output_stream = None
+
+ def write(self, b: bytes):
+ self._output_stream.write(b)
+ if self._flush_event:
+ self._output_stream.flush()
+ self._flush_event = False
+
+ def parse_output_stream(self, output_stream: create_OutputStream):
+ self._output_stream = output_stream
+
+ def notify_flush(self):
+ if not self._flush_event:
Review comment:
```suggestion
if not self._flush_event:
```
I guess this line could be removed
##########
File path: flink-python/pyflink/fn_execution/beam/beam_stream_slow.py
##########
@@ -33,3 +34,31 @@ def read_byte(self):
def size(self):
return self._input_stream.size()
+
+
+class BeamTimeBasedOutputStream(create_OutputStream):
+ def __init__(self):
+ super(BeamTimeBasedOutputStream).__init__()
+ self._flush_event = False
+ self._periodic_flusher = PeriodicThread(1, self.notify_flush)
+ self._periodic_flusher.daemon = True
+ self._periodic_flusher.start()
+ self._output_stream = None
+
+ def write(self, b: bytes):
+ self._output_stream.write(b)
+ if self._flush_event:
+ self._output_stream.flush()
+ self._flush_event = False
+
+ def parse_output_stream(self, output_stream: create_OutputStream):
Review comment:
```suggestion
def reset_output_stream(self, output_stream: create_OutputStream):
```
##########
File path: flink-python/pyflink/fn_execution/stream_fast.pyx
##########
@@ -94,6 +100,21 @@ cdef class OutputStream:
self.buffer_size = 1024
self.pos = 0
+ cpdef void write(self, bytes v):
Review comment:
Why it's cpdef and the other methods are declared cdef?
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws
Exception {
protected void invokeFinishBundle() throws Exception {
if (elementCount > 0) {
- pythonFunctionRunner.flush();
- elementCount = 0;
+ AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+ CountDownLatch flushThreadStart = new CountDownLatch(1);
+ AtomicReference<Exception> exceptionReference = new
AtomicReference<>();
+ Thread flushThread =
+ new Thread(
+ () -> {
+ try {
+ flushThreadStart.countDown();
+ pythonFunctionRunner.flush();
+ } catch (Exception e) {
+ exceptionReference.set(e);
+ } finally {
+ flushThreadFinish.set(true);
+ // interrupt the progress of takeResult in
avoid of the main
+ // thread is locked forever.
Review comment:
```suggestion
// thread is blocked forever.
```
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws
Exception {
protected void invokeFinishBundle() throws Exception {
if (elementCount > 0) {
- pythonFunctionRunner.flush();
- elementCount = 0;
+ AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+ CountDownLatch flushThreadStart = new CountDownLatch(1);
+ AtomicReference<Exception> exceptionReference = new
AtomicReference<>();
+ Thread flushThread =
+ new Thread(
+ () -> {
+ try {
+ flushThreadStart.countDown();
+ pythonFunctionRunner.flush();
+ } catch (Exception e) {
+ exceptionReference.set(e);
+ } finally {
+ flushThreadFinish.set(true);
+ // interrupt the progress of takeResult in
avoid of the main
+ // thread is locked forever.
+ ((BeamPythonFunctionRunner)
pythonFunctionRunner)
+ .noEmptySignal();
Review comment:
```suggestion
.notifyNoMoreResults();
```
##########
File path: flink-python/pyflink/fn_execution/beam/beam_operations_slow.py
##########
@@ -93,6 +94,7 @@ def process(self, o: WindowedValue):
self._value_coder_impl.encode_to_stream(
self.process_element(value), output_stream, True)
output_stream.maybe_flush()
+ self._value_coder_impl._output_stream.close()
Review comment:
When it's opened/started?
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws
Exception {
protected void invokeFinishBundle() throws Exception {
if (elementCount > 0) {
- pythonFunctionRunner.flush();
- elementCount = 0;
+ AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+ CountDownLatch flushThreadStart = new CountDownLatch(1);
+ AtomicReference<Exception> exceptionReference = new
AtomicReference<>();
+ Thread flushThread =
+ new Thread(
+ () -> {
+ try {
+ flushThreadStart.countDown();
+ pythonFunctionRunner.flush();
+ } catch (Exception e) {
+ exceptionReference.set(e);
+ } finally {
+ flushThreadFinish.set(true);
+ // interrupt the progress of takeResult in
avoid of the main
+ // thread is locked forever.
+ ((BeamPythonFunctionRunner)
pythonFunctionRunner)
+ .noEmptySignal();
+ }
+ });
+ flushThread.start();
+ flushThreadStart.await();
Review comment:
Why need to wait the flush thread started?
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws
Exception {
protected void invokeFinishBundle() throws Exception {
if (elementCount > 0) {
- pythonFunctionRunner.flush();
- elementCount = 0;
+ AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+ CountDownLatch flushThreadStart = new CountDownLatch(1);
+ AtomicReference<Exception> exceptionReference = new
AtomicReference<>();
+ Thread flushThread =
+ new Thread(
+ () -> {
+ try {
+ flushThreadStart.countDown();
+ pythonFunctionRunner.flush();
+ } catch (Exception e) {
+ exceptionReference.set(e);
+ } finally {
+ flushThreadFinish.set(true);
+ // interrupt the progress of takeResult in
avoid of the main
Review comment:
```suggestion
// interrupt the progress of takeResult
to avoid the main
```
##########
File path:
flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
##########
@@ -317,9 +321,41 @@ private void checkInvokeFinishBundleByTime() throws
Exception {
protected void invokeFinishBundle() throws Exception {
if (elementCount > 0) {
- pythonFunctionRunner.flush();
- elementCount = 0;
+ AtomicBoolean flushThreadFinish = new AtomicBoolean(false);
+ CountDownLatch flushThreadStart = new CountDownLatch(1);
+ AtomicReference<Exception> exceptionReference = new
AtomicReference<>();
+ Thread flushThread =
+ new Thread(
+ () -> {
+ try {
+ flushThreadStart.countDown();
+ pythonFunctionRunner.flush();
+ } catch (Exception e) {
Review comment:
```suggestion
} catch (Throwable t) {
```
##########
File path: flink-python/pyflink/fn_execution/beam/beam_stream_fast.pyx
##########
@@ -91,12 +90,33 @@ cdef class BeamOutputStream(LengthPrefixOutputStream):
self._output_stream.flush()
self._output_pos = 0
- cdef void _parse_output_stream(self, BOutputStream output_stream):
+ cdef void parse_output_stream(self, BOutputStream output_stream):
+ self._output_stream = output_stream
self._output_data = output_stream.data
self._output_pos = output_stream.pos
self._output_buffer_size = output_stream.buffer_size
cdef void _maybe_flush(self):
if self._output_pos > 10_000_000:
- self._output_stream.flush()
- self._output_pos = 0
+ self.flush()
+
+cdef class BeamTimeBasedOutputStream(BeamSizeBasedOutputStream):
+ def __init__(self, *args, **kwargs):
+ self._flush_event = False
+ self._periodic_flusher = PeriodicThread(1, self.notify_flush)
+ self._periodic_flusher.daemon = True
+ self._periodic_flusher.start()
+
+ cpdef void notify_flush(self):
+ if not self._flush_event:
+ self._flush_event = True
+
+ cdef void _maybe_flush(self):
+ if self._flush_event or self._output_pos > 10_000_000:
Review comment:
```suggestion
if self._flush_event or self._output_pos > 10_000_000:
```
Why not calling the method _maybe_flush defined in the parent class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]