HuangXingBo commented on a change in pull request #18292:
URL: https://github.com/apache/flink/pull/18292#discussion_r779999483
##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
return cls()
else:
return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+ """Call a function periodically with the specified number of seconds"""
+
+ def __init__(self,
+ interval,
+ function,
+ args=None,
+ kwargs=None
+ ):
+ # type: (...) -> None
Review comment:
```suggestion
```
##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
return cls()
else:
return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+ """Call a function periodically with the specified number of seconds"""
+
+ def __init__(self,
+ interval,
+ function,
+ args=None,
+ kwargs=None
+ ):
+ # type: (...) -> None
+ threading.Thread.__init__(self)
+ self._interval = interval
+ self._function = function
+ self._args = args if args is not None else []
+ self._kwargs = kwargs if kwargs is not None else {}
+ self._finished = threading.Event()
+
+ def run(self):
+ # type: () -> None
Review comment:
```suggestion
def run(self) -> None:
```
##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
return cls()
else:
return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+ """Call a function periodically with the specified number of seconds"""
+
+ def __init__(self,
+ interval,
+ function,
+ args=None,
+ kwargs=None
+ ):
+ # type: (...) -> None
+ threading.Thread.__init__(self)
+ self._interval = interval
+ self._function = function
+ self._args = args if args is not None else []
+ self._kwargs = kwargs if kwargs is not None else {}
+ self._finished = threading.Event()
+
+ def run(self):
+ # type: () -> None
+ next_call = time.time() + self._interval
+ now = time.time()
+ while (next_call <= now and not self._finished.is_set()) or \
+ (not self._finished.wait(next_call - now)):
+ next_call = next_call + self._interval
Review comment:
when next_call <= now, next_call = now + self._interval maybe more valid.
##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
return cls()
else:
return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+ """Call a function periodically with the specified number of seconds"""
+
+ def __init__(self,
+ interval,
+ function,
+ args=None,
+ kwargs=None
+ ):
+ # type: (...) -> None
+ threading.Thread.__init__(self)
+ self._interval = interval
+ self._function = function
+ self._args = args if args is not None else []
+ self._kwargs = kwargs if kwargs is not None else {}
+ self._finished = threading.Event()
+
+ def run(self):
+ # type: () -> None
+ next_call = time.time() + self._interval
+ now = time.time()
Review comment:
```suggestion
now = time.time()
next_call = now + self._interval
```
##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
return cls()
else:
return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+ """Call a function periodically with the specified number of seconds"""
+
+ def __init__(self,
+ interval,
+ function,
+ args=None,
+ kwargs=None
+ ):
+ # type: (...) -> None
+ threading.Thread.__init__(self)
+ self._interval = interval
+ self._function = function
+ self._args = args if args is not None else []
+ self._kwargs = kwargs if kwargs is not None else {}
+ self._finished = threading.Event()
+
+ def run(self):
+ # type: () -> None
+ next_call = time.time() + self._interval
+ now = time.time()
+ while (next_call <= now and not self._finished.is_set()) or \
+ (not self._finished.wait(next_call - now)):
+ next_call = next_call + self._interval
+ self._function(*self._args, **self._kwargs)
+ now = time.time()
+
+ def cancel(self):
+ # type: () -> None
Review comment:
```suggestion
def cancel(self) -> None:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]