chadrik commented on a change in pull request #13060:
URL: https://github.com/apache/beam/pull/13060#discussion_r505933134
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
##########
@@ -1062,34 +1104,41 @@ def close(self):
class ControlFuture(object):
- def __init__(self, instruction_id, response=None):
+ def __init__(self,
+ instruction_id, # type: str
+ response=None # type:
Optional[beam_fn_api_pb2.InstructionResponse]
+ ):
+ # type: (...) -> None
self.instruction_id = instruction_id
- if response:
- self._response = response
- else:
- self._response = None
+ self._response = response
+ if response is None:
self._condition = threading.Condition()
- self._exception = None
+ self._exception = None # type: Optional[Exception]
def is_done(self):
+ # type: () -> bool
return self._response is not None
def set(self, response):
+ # type: (beam_fn_api_pb2.InstructionResponse) -> None
with self._condition:
self._response = response
self._condition.notify_all()
def get(self, timeout=None):
+ # type: (Optional[float]) -> beam_fn_api_pb2.InstructionResponse
if not self._response and not self._exception:
with self._condition:
if not self._response and not self._exception:
self._condition.wait(timeout)
if self._exception:
raise self._exception
else:
+ assert self._response is not None
Review comment:
It would be good to get confirmation that this assert is correct.
AFAICT this method should never return None.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]