srkukarni closed pull request #2626: Make use of workerconfig defined health 
check interval
URL: https://github.com/apache/incubator-pulsar/pull/2626
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index ddd546e1f1..f8a14ffc1a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -114,7 +114,7 @@ def update(self, object):
     
 
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples, user_code, pulsar_client):
+  def __init__(self, instance_id, function_id, function_version, 
function_details, max_buffered_tuples, expected_healthcheck_interval, 
user_code, pulsar_client):
     self.instance_config = InstanceConfig(instance_id, function_id, 
function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
     self.queue = Queue.Queue(max_buffered_tuples)
@@ -138,6 +138,7 @@ def __init__(self, instance_id, function_id, 
function_version, function_details,
     self.stats = Stats()
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if 
function_details.source.timeoutMs > 0 else None
+    self.expected_healthcheck_interval = expected_healthcheck_interval
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -146,12 +147,12 @@ def health_check(self):
     return health_check_result
 
   def process_spawner_health_check_timer(self):
-    if time.time() - self.last_health_check_ts > 90:
+    if time.time() - self.last_health_check_ts > 
self.expected_healthcheck_interval * 3:
       Log.critical("Haven't received health check from spawner in a while. 
Stopping instance...")
       os.kill(os.getpid(), signal.SIGKILL)
       sys.exit(1)
 
-    Timer(30, self.process_spawner_health_check_timer).start()
+    Timer(self.expected_healthcheck_interval, 
self.process_spawner_health_check_timer).start()
 
   def run(self):
     # Setup consumers and input deserializers
@@ -214,7 +215,8 @@ def run(self):
 
     # start proccess spawner health check timer
     self.last_health_check_ts = time.time()
-    Timer(30, self.process_spawner_health_check_timer).start()
+    if self.expected_healthcheck_interval > 0:
+      Timer(self.expected_healthcheck_interval, 
self.process_spawner_health_check_timer).start()
 
   def actual_execution(self):
     Log.info("Started Thread for executing the function")
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py 
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index d9f1132bfb..2f5c8959fa 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -70,6 +70,7 @@ def main():
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum 
number of Buffered tuples')
   parser.add_argument('--logging_directory', required=True, help='Logging 
Directory')
   parser.add_argument('--logging_file', required=True, help='Log file name')
+  parser.add_argument('--expected_healthcheck_interval', required=True, 
help='Expected time in seconds between health checks', type=int)
 
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
@@ -97,7 +98,9 @@ def main():
   pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 
1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
   pyinstance = python_instance.PythonInstance(str(args.instance_id), 
str(args.function_id),
                                               str(args.function_version), 
function_details,
-                                              int(args.max_buffered_tuples), 
str(args.py), pulsar_client)
+                                              int(args.max_buffered_tuples),
+                                              
int(args.expected_healthcheck_interval),
+                                              str(args.py), pulsar_client)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 083686b503..38a4c281e8 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -92,6 +92,9 @@
     @Parameter(names = "--max_buffered_tuples", description = "Maximum number 
of tuples to buffer\n", required = true)
     protected int maxBufferedTuples;
 
+    @Parameter(names = "--expected_healthcheck_interval", description = 
"Expected interval in seconds between healtchecks", required = true)
+    protected int expectedHealthCheckInterval;
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
@@ -124,7 +127,7 @@ public void start() throws Exception {
                 instanceConfig,
                 jarFile,
                 containerFactory,
-                30000);
+                expectedHealthCheckInterval * 1000);
 
         server = ServerBuilder.forPort(port)
                 .addService(new InstanceControlImpl(runtimeSpawner))
@@ -146,20 +149,22 @@ public void run() {
         log.info("Starting runtimeSpawner");
         runtimeSpawner.start();
 
-        timer = Executors.newSingleThreadScheduledExecutor();
-        timer.scheduleAtFixedRate(new TimerTask() {
-            @Override
-            public void run() {
-                try {
-                    if (System.currentTimeMillis() - lastHealthCheckTs > 
90000) {
-                        log.info("Haven't received health check from spawner 
in a while. Stopping instance...");
-                        close();
+        if (expectedHealthCheckInterval > 0) {
+            timer = Executors.newSingleThreadScheduledExecutor();
+            timer.scheduleAtFixedRate(new TimerTask() {
+                @Override
+                public void run() {
+                    try {
+                        if (System.currentTimeMillis() - lastHealthCheckTs > 3 
* expectedHealthCheckInterval * 1000) {
+                            log.info("Haven't received health check from 
spawner in a while. Stopping instance...");
+                            close();
+                        }
+                    } catch (Exception e) {
+                        log.error("Error occurred when checking for latest 
health check", e);
                     }
-                } catch (Exception e) {
-                    log.error("Error occurred when checking for latest health 
check", e);
                 }
-            }
-        }, 30000, 30000, TimeUnit.MILLISECONDS);
+            }, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval 
* 1000, TimeUnit.MILLISECONDS);
+        }
 
         runtimeSpawner.join();
         log.info("RuntimeSpawner quit, shutting down JavaInstance");
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c5159b0706..044f636535 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -73,11 +73,12 @@
                    String codeFile,
                    String pulsarServiceUrl,
                    String stateStorageServiceUrl,
-                   AuthenticationConfig authConfig) throws Exception {
+                   AuthenticationConfig authConfig,
+                   Long expectedHealthCheckInterval) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
         this.processArgs = composeArgs(instanceConfig, instanceFile, 
logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
-                authConfig);
+                authConfig, expectedHealthCheckInterval);
     }
 
     private List<String> composeArgs(InstanceConfig instanceConfig,
@@ -86,7 +87,8 @@
                                      String codeFile,
                                      String pulsarServiceUrl,
                                      String stateStorageServiceUrl,
-                                     AuthenticationConfig authConfig) throws 
Exception {
+                                     AuthenticationConfig authConfig,
+                                     Long expectedHealthCheckInterval) throws 
Exception {
         List<String> args = new LinkedList<>();
         if (instanceConfig.getFunctionDetails().getRuntime() == 
Function.FunctionDetails.Runtime.JAVA) {
             args.add("java");
@@ -167,6 +169,8 @@
             args.add("--state_storage_serviceurl");
             args.add(stateStorageServiceUrl);
         }
+        args.add("--expected_healthcheck_interval");
+        args.add(String.valueOf(expectedHealthCheckInterval));
         return args;
     }
 
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index b16e88d262..7b910bc295 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -91,7 +91,8 @@ public ProcessRuntimeFactory(String pulsarServiceUrl,
     }
 
     @Override
-    public ProcessRuntime createContainer(InstanceConfig instanceConfig, 
String codeFile) throws Exception {
+    public ProcessRuntime createContainer(InstanceConfig instanceConfig, 
String codeFile,
+                                          Long expectedHealthCheckInterval) 
throws Exception {
         String instanceFile;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
@@ -110,7 +111,8 @@ public ProcessRuntime createContainer(InstanceConfig 
instanceConfig, String code
             codeFile,
             pulsarServiceUrl,
             stateStorageServiceUrl,
-            authConfig);
+            authConfig,
+            expectedHealthCheckInterval);
     }
 
     @Override
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 0fe1f9f9e5..ef2ea9c569 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -33,7 +33,8 @@
      * @return function container to start/stop instance
      */
     Runtime createContainer(
-            InstanceConfig instanceConfig, String codeFile) throws Exception;
+            InstanceConfig instanceConfig, String codeFile,
+            Long expectedHealthCheckInterval) throws Exception;
 
     @Override
     void close();
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index aad7d4a807..00c09aef7f 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -74,7 +74,8 @@ public void start() throws Exception {
             throw new IllegalArgumentException("topics-pattern is not 
supported for python function");
         }
 
-        runtime = runtimeFactory.createContainer(this.instanceConfig, 
codeFile);
+        runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile,
+                instanceLivenessCheckFreqMs * 1000);
         runtime.start();
 
         // monitor function runtime to make sure it is running.  If not, 
restart the function runtime
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index f9c9cff3f8..84d21af6ae 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -81,7 +81,8 @@ private static PulsarClient createPulsarClient(String 
pulsarServiceUrl, Authenti
     }
     
     @Override
-    public ThreadRuntime createContainer(InstanceConfig instanceConfig, String 
jarFile) {
+    public ThreadRuntime createContainer(InstanceConfig instanceConfig, String 
jarFile,
+                                         Long expectedHealthCheckInterval) {
         return new ThreadRuntime(
             instanceConfig,
             fnCache,
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 6f1e2f3ea6..97881a4c84 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -114,9 +114,9 @@ InstanceConfig 
createJavaInstanceConfig(FunctionDetails.Runtime runtime) {
     public void testJavaConstructor() throws Exception {
         InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
 
-        ProcessRuntime container = factory.createContainer(config, 
userJarFile);
+        ProcessRuntime container = factory.createContainer(config, 
userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 26);
+        assertEquals(args.size(), 28);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + 
javaInstanceJarFile
                 + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -129,7 +129,8 @@ public void testJavaConstructor() throws Exception {
                 + " --function_details " + 
JsonFormat.printer().print(config.getFunctionDetails())
                 + " --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
-                + " --state_storage_serviceurl " + stateStorageServiceUrl;
+                + " --state_storage_serviceurl " + stateStorageServiceUrl
+                + " --expected_healthcheck_interval 30";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -137,9 +138,9 @@ public void testJavaConstructor() throws Exception {
     public void testPythonConstructor() throws Exception {
         InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
 
-        ProcessRuntime container = factory.createContainer(config, 
userJarFile);
+        ProcessRuntime container = factory.createContainer(config, 
userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 22);
+        assertEquals(args.size(), 24);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
                 + logDirectory + "/functions" + " --logging_file " + 
config.getFunctionDetails().getName() + " --instance_id "
@@ -147,7 +148,8 @@ public void testPythonConstructor() throws Exception {
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details " + 
JsonFormat.printer().print(config.getFunctionDetails())
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(21);
+                + " --max_buffered_tuples 1024 --port " + args.get(21)
+                + " --expected_healthcheck_interval 30";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 575447792c..05e3422d39 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -103,7 +103,7 @@ public void testStartFunctionWithPkgUrl() throws Exception {
 
         RuntimeFactory factory = mock(RuntimeFactory.class);
         Runtime runtime = mock(Runtime.class);
-        doReturn(runtime).when(factory).createContainer(any(), any());
+        doReturn(runtime).when(factory).createContainer(any(), any(), any());
         doNothing().when(runtime).start();
         Namespace dlogNamespace = mock(Namespace.class);
         final String exceptionMsg = "dl namespace not-found";


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to