HuangXingBo commented on a change in pull request #18292:
URL: https://github.com/apache/flink/pull/18292#discussion_r779999483



##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
         return cls()
     else:
         return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+    """Call a function periodically with the specified number of seconds"""
+
+    def __init__(self,
+                 interval,
+                 function,
+                 args=None,
+                 kwargs=None
+                 ):
+        # type: (...) -> None

Review comment:
       ```suggestion
        
   ```

##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
         return cls()
     else:
         return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+    """Call a function periodically with the specified number of seconds"""
+
+    def __init__(self,
+                 interval,
+                 function,
+                 args=None,
+                 kwargs=None
+                 ):
+        # type: (...) -> None
+        threading.Thread.__init__(self)
+        self._interval = interval
+        self._function = function
+        self._args = args if args is not None else []
+        self._kwargs = kwargs if kwargs is not None else {}
+        self._finished = threading.Event()
+
+    def run(self):
+        # type: () -> None

Review comment:
       ```suggestion
       def run(self) -> None:
   ```

##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
         return cls()
     else:
         return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+    """Call a function periodically with the specified number of seconds"""
+
+    def __init__(self,
+                 interval,
+                 function,
+                 args=None,
+                 kwargs=None
+                 ):
+        # type: (...) -> None
+        threading.Thread.__init__(self)
+        self._interval = interval
+        self._function = function
+        self._args = args if args is not None else []
+        self._kwargs = kwargs if kwargs is not None else {}
+        self._finished = threading.Event()
+
+    def run(self):
+        # type: () -> None
+        next_call = time.time() + self._interval
+        now = time.time()
+        while (next_call <= now and not self._finished.is_set()) or \
+                (not self._finished.wait(next_call - now)):
+            next_call = next_call + self._interval

Review comment:
       when next_call <= now, next_call = now + self._interval maybe more valid.

##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
         return cls()
     else:
         return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+    """Call a function periodically with the specified number of seconds"""
+
+    def __init__(self,
+                 interval,
+                 function,
+                 args=None,
+                 kwargs=None
+                 ):
+        # type: (...) -> None
+        threading.Thread.__init__(self)
+        self._interval = interval
+        self._function = function
+        self._args = args if args is not None else []
+        self._kwargs = kwargs if kwargs is not None else {}
+        self._finished = threading.Event()
+
+    def run(self):
+        # type: () -> None
+        next_call = time.time() + self._interval
+        now = time.time()

Review comment:
       ```suggestion
           now = time.time()
           next_call = now + self._interval
           
   ```

##########
File path: flink-python/pyflink/fn_execution/utils/operation_utils.py
##########
@@ -271,3 +273,37 @@ def load_aggregate_function(payload):
         return cls()
     else:
         return pickle.loads(payload)
+
+
+class PeriodicThread(threading.Thread):
+    """Call a function periodically with the specified number of seconds"""
+
+    def __init__(self,
+                 interval,
+                 function,
+                 args=None,
+                 kwargs=None
+                 ):
+        # type: (...) -> None
+        threading.Thread.__init__(self)
+        self._interval = interval
+        self._function = function
+        self._args = args if args is not None else []
+        self._kwargs = kwargs if kwargs is not None else {}
+        self._finished = threading.Event()
+
+    def run(self):
+        # type: () -> None
+        next_call = time.time() + self._interval
+        now = time.time()
+        while (next_call <= now and not self._finished.is_set()) or \
+                (not self._finished.wait(next_call - now)):
+            next_call = next_call + self._interval
+            self._function(*self._args, **self._kwargs)
+            now = time.time()
+
+    def cancel(self):
+        # type: () -> None

Review comment:
       ```suggestion
       def cancel(self) -> None:
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to