srkukarni closed pull request #2626: Make use of workerconfig defined health
check interval
URL: https://github.com/apache/incubator-pulsar/pull/2626
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index ddd546e1f1..f8a14ffc1a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -114,7 +114,7 @@ def update(self, object):
class PythonInstance(object):
- def __init__(self, instance_id, function_id, function_version,
function_details, max_buffered_tuples, user_code, pulsar_client):
+ def __init__(self, instance_id, function_id, function_version,
function_details, max_buffered_tuples, expected_healthcheck_interval,
user_code, pulsar_client):
self.instance_config = InstanceConfig(instance_id, function_id,
function_version, function_details, max_buffered_tuples)
self.user_code = user_code
self.queue = Queue.Queue(max_buffered_tuples)
@@ -138,6 +138,7 @@ def __init__(self, instance_id, function_id,
function_version, function_details,
self.stats = Stats()
self.last_health_check_ts = time.time()
self.timeout_ms = function_details.source.timeoutMs if
function_details.source.timeoutMs > 0 else None
+ self.expected_healthcheck_interval = expected_healthcheck_interval
def health_check(self):
self.last_health_check_ts = time.time()
@@ -146,12 +147,12 @@ def health_check(self):
return health_check_result
def process_spawner_health_check_timer(self):
- if time.time() - self.last_health_check_ts > 90:
+ if time.time() - self.last_health_check_ts >
self.expected_healthcheck_interval * 3:
Log.critical("Haven't received health check from spawner in a while.
Stopping instance...")
os.kill(os.getpid(), signal.SIGKILL)
sys.exit(1)
- Timer(30, self.process_spawner_health_check_timer).start()
+ Timer(self.expected_healthcheck_interval,
self.process_spawner_health_check_timer).start()
def run(self):
# Setup consumers and input deserializers
@@ -214,7 +215,8 @@ def run(self):
# start proccess spawner health check timer
self.last_health_check_ts = time.time()
- Timer(30, self.process_spawner_health_check_timer).start()
+ if self.expected_healthcheck_interval > 0:
+ Timer(self.expected_healthcheck_interval,
self.process_spawner_health_check_timer).start()
def actual_execution(self):
Log.info("Started Thread for executing the function")
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py
b/pulsar-functions/instance/src/main/python/python_instance_main.py
index d9f1132bfb..2f5c8959fa 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -70,6 +70,7 @@ def main():
parser.add_argument('--max_buffered_tuples', required=True, help='Maximum
number of Buffered tuples')
parser.add_argument('--logging_directory', required=True, help='Logging
Directory')
parser.add_argument('--logging_file', required=True, help='Log file name')
+ parser.add_argument('--expected_healthcheck_interval', required=True,
help='Expected time in seconds between health checks', type=int)
args = parser.parse_args()
function_details = Function_pb2.FunctionDetails()
@@ -97,7 +98,9 @@ def main():
pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1,
1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
pyinstance = python_instance.PythonInstance(str(args.instance_id),
str(args.function_id),
str(args.function_version),
function_details,
- int(args.max_buffered_tuples),
str(args.py), pulsar_client)
+ int(args.max_buffered_tuples),
+
int(args.expected_healthcheck_interval),
+ str(args.py), pulsar_client)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 083686b503..38a4c281e8 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -92,6 +92,9 @@
@Parameter(names = "--max_buffered_tuples", description = "Maximum number
of tuples to buffer\n", required = true)
protected int maxBufferedTuples;
+ @Parameter(names = "--expected_healthcheck_interval", description =
"Expected interval in seconds between healtchecks", required = true)
+ protected int expectedHealthCheckInterval;
+
private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
@@ -124,7 +127,7 @@ public void start() throws Exception {
instanceConfig,
jarFile,
containerFactory,
- 30000);
+ expectedHealthCheckInterval * 1000);
server = ServerBuilder.forPort(port)
.addService(new InstanceControlImpl(runtimeSpawner))
@@ -146,20 +149,22 @@ public void run() {
log.info("Starting runtimeSpawner");
runtimeSpawner.start();
- timer = Executors.newSingleThreadScheduledExecutor();
- timer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- if (System.currentTimeMillis() - lastHealthCheckTs >
90000) {
- log.info("Haven't received health check from spawner
in a while. Stopping instance...");
- close();
+ if (expectedHealthCheckInterval > 0) {
+ timer = Executors.newSingleThreadScheduledExecutor();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ if (System.currentTimeMillis() - lastHealthCheckTs > 3
* expectedHealthCheckInterval * 1000) {
+ log.info("Haven't received health check from
spawner in a while. Stopping instance...");
+ close();
+ }
+ } catch (Exception e) {
+ log.error("Error occurred when checking for latest
health check", e);
}
- } catch (Exception e) {
- log.error("Error occurred when checking for latest health
check", e);
}
- }
- }, 30000, 30000, TimeUnit.MILLISECONDS);
+ }, expectedHealthCheckInterval * 1000, expectedHealthCheckInterval
* 1000, TimeUnit.MILLISECONDS);
+ }
runtimeSpawner.join();
log.info("RuntimeSpawner quit, shutting down JavaInstance");
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c5159b0706..044f636535 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -73,11 +73,12 @@
String codeFile,
String pulsarServiceUrl,
String stateStorageServiceUrl,
- AuthenticationConfig authConfig) throws Exception {
+ AuthenticationConfig authConfig,
+ Long expectedHealthCheckInterval) throws Exception {
this.instanceConfig = instanceConfig;
this.instancePort = instanceConfig.getPort();
this.processArgs = composeArgs(instanceConfig, instanceFile,
logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
- authConfig);
+ authConfig, expectedHealthCheckInterval);
}
private List<String> composeArgs(InstanceConfig instanceConfig,
@@ -86,7 +87,8 @@
String codeFile,
String pulsarServiceUrl,
String stateStorageServiceUrl,
- AuthenticationConfig authConfig) throws
Exception {
+ AuthenticationConfig authConfig,
+ Long expectedHealthCheckInterval) throws
Exception {
List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() ==
Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
@@ -167,6 +169,8 @@
args.add("--state_storage_serviceurl");
args.add(stateStorageServiceUrl);
}
+ args.add("--expected_healthcheck_interval");
+ args.add(String.valueOf(expectedHealthCheckInterval));
return args;
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index b16e88d262..7b910bc295 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -91,7 +91,8 @@ public ProcessRuntimeFactory(String pulsarServiceUrl,
}
@Override
- public ProcessRuntime createContainer(InstanceConfig instanceConfig,
String codeFile) throws Exception {
+ public ProcessRuntime createContainer(InstanceConfig instanceConfig,
String codeFile,
+ Long expectedHealthCheckInterval)
throws Exception {
String instanceFile;
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
@@ -110,7 +111,8 @@ public ProcessRuntime createContainer(InstanceConfig
instanceConfig, String code
codeFile,
pulsarServiceUrl,
stateStorageServiceUrl,
- authConfig);
+ authConfig,
+ expectedHealthCheckInterval);
}
@Override
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 0fe1f9f9e5..ef2ea9c569 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -33,7 +33,8 @@
* @return function container to start/stop instance
*/
Runtime createContainer(
- InstanceConfig instanceConfig, String codeFile) throws Exception;
+ InstanceConfig instanceConfig, String codeFile,
+ Long expectedHealthCheckInterval) throws Exception;
@Override
void close();
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index aad7d4a807..00c09aef7f 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -74,7 +74,8 @@ public void start() throws Exception {
throw new IllegalArgumentException("topics-pattern is not
supported for python function");
}
- runtime = runtimeFactory.createContainer(this.instanceConfig,
codeFile);
+ runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile,
+ instanceLivenessCheckFreqMs * 1000);
runtime.start();
// monitor function runtime to make sure it is running. If not,
restart the function runtime
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index f9c9cff3f8..84d21af6ae 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -81,7 +81,8 @@ private static PulsarClient createPulsarClient(String
pulsarServiceUrl, Authenti
}
@Override
- public ThreadRuntime createContainer(InstanceConfig instanceConfig, String
jarFile) {
+ public ThreadRuntime createContainer(InstanceConfig instanceConfig, String
jarFile,
+ Long expectedHealthCheckInterval) {
return new ThreadRuntime(
instanceConfig,
fnCache,
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 6f1e2f3ea6..97881a4c84 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -114,9 +114,9 @@ InstanceConfig
createJavaInstanceConfig(FunctionDetails.Runtime runtime) {
public void testJavaConstructor() throws Exception {
InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
- ProcessRuntime container = factory.createContainer(config,
userJarFile);
+ ProcessRuntime container = factory.createContainer(config,
userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 26);
+ assertEquals(args.size(), 28);
String expectedArgs = "java -cp " + javaInstanceJarFile
+ " -Dpulsar.functions.java.instance.jar=" +
javaInstanceJarFile
+ " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -129,7 +129,8 @@ public void testJavaConstructor() throws Exception {
+ " --function_details " +
JsonFormat.printer().print(config.getFunctionDetails())
+ " --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(23)
- + " --state_storage_serviceurl " + stateStorageServiceUrl;
+ + " --state_storage_serviceurl " + stateStorageServiceUrl
+ + " --expected_healthcheck_interval 30";
assertEquals(String.join(" ", args), expectedArgs);
}
@@ -137,9 +138,9 @@ public void testJavaConstructor() throws Exception {
public void testPythonConstructor() throws Exception {
InstanceConfig config =
createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
- ProcessRuntime container = factory.createContainer(config,
userJarFile);
+ ProcessRuntime container = factory.createContainer(config,
userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 22);
+ assertEquals(args.size(), 24);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + userJarFile + " --logging_directory "
+ logDirectory + "/functions" + " --logging_file " +
config.getFunctionDetails().getName() + " --instance_id "
@@ -147,7 +148,8 @@ public void testPythonConstructor() throws Exception {
+ " --function_version " + config.getFunctionVersion()
+ " --function_details " +
JsonFormat.printer().print(config.getFunctionDetails())
+ " --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port " + args.get(21);
+ + " --max_buffered_tuples 1024 --port " + args.get(21)
+ + " --expected_healthcheck_interval 30";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 575447792c..05e3422d39 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -103,7 +103,7 @@ public void testStartFunctionWithPkgUrl() throws Exception {
RuntimeFactory factory = mock(RuntimeFactory.class);
Runtime runtime = mock(Runtime.class);
- doReturn(runtime).when(factory).createContainer(any(), any());
+ doReturn(runtime).when(factory).createContainer(any(), any(), any());
doNothing().when(runtime).start();
Namespace dlogNamespace = mock(Namespace.class);
final String exceptionMsg = "dl namespace not-found";
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services