This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a3f6aba k8s runtime: force deletion to avoid hung function worker
during connector restart (#12504)
a3f6aba is described below
commit a3f6aba81a7bbd55a8429cd724694cdcee7d3f2e
Author: Andrey Yegorov <[email protected]>
AuthorDate: Sat Nov 6 13:17:04 2021 -0700
k8s runtime: force deletion to avoid hung function worker during connector
restart (#12504)
---
conf/functions_worker.yml | 4 ++++
.../functions/runtime/kubernetes/KubernetesRuntime.java | 11 +++++++----
.../runtime/kubernetes/KubernetesRuntimeFactory.java | 3 +++
.../runtime/kubernetes/KubernetesRuntimeFactoryConfig.java | 6 ++++++
site2/docs/functions-runtime.md | 4 ++++
5 files changed, 24 insertions(+), 4 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 58dfc69..9ca5f7b 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -216,6 +216,10 @@ functionRuntimeFactoryConfigs:
# extraFunctionDependenciesDir:
# # Additional memory padding added on top of the memory requested by the
function per on a per instance basis
# percentMemoryPadding: 10
+# # The duration in seconds before the StatefulSet deleted on function
stop/restart.
+# # Value must be non-negative integer. The value zero indicates delete
immediately.
+# # Default is 5 seconds.
+# gracePeriodSeconds: 5
## A set of the minimum amount of resources functions must request.
## Support for this depends on function runtime.
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index a483bd0..1301596 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -149,6 +149,7 @@ public class KubernetesRuntime implements Runtime {
private int percentMemoryPadding;
private double cpuOverCommitRatio;
private double memoryOverCommitRatio;
+ private int gracePeriodSeconds;
private final Optional<KubernetesFunctionAuthProvider>
functionAuthDataCacheProvider;
private final AuthenticationConfig authConfig;
private Integer grpcPort;
@@ -186,6 +187,7 @@ public class KubernetesRuntime implements Runtime {
int percentMemoryPadding,
double cpuOverCommitRatio,
double memoryOverCommitRatio,
+ int gracePeriodSeconds,
Optional<KubernetesFunctionAuthProvider>
functionAuthDataCacheProvider,
boolean authenticationEnabled,
Integer grpcPort,
@@ -212,6 +214,7 @@ public class KubernetesRuntime implements Runtime {
this.percentMemoryPadding = percentMemoryPadding;
this.cpuOverCommitRatio = cpuOverCommitRatio;
this.memoryOverCommitRatio = memoryOverCommitRatio;
+ this.gracePeriodSeconds = gracePeriodSeconds;
this.authenticationEnabled = authenticationEnabled;
this.manifestCustomizer = manifestCustomizer;
this.functionInstanceClassPath = functionInstanceClassPath;
@@ -567,7 +570,7 @@ public class KubernetesRuntime implements Runtime {
public void deleteStatefulSet() throws InterruptedException {
String statefulSetName =
createJobName(instanceConfig.getFunctionDetails(), this.jobName);
final V1DeleteOptions options = new V1DeleteOptions();
- options.setGracePeriodSeconds(5L);
+ options.setGracePeriodSeconds((long)gracePeriodSeconds);
options.setPropagationPolicy("Foreground");
String fqfn =
FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
@@ -583,8 +586,8 @@ public class KubernetesRuntime implements Runtime {
response = appsClient.deleteNamespacedStatefulSetCall(
statefulSetName,
jobNamespace, null, null,
- 5, null, "Foreground",
- null, null)
+ gracePeriodSeconds, null, "Foreground",
+ options, null)
.execute();
} catch (ApiException e) {
// if already deleted
@@ -735,7 +738,7 @@ public class KubernetesRuntime implements Runtime {
serviceName,
jobNamespace, null, null,
0, null,
- "Foreground", null, null).execute();
+ "Foreground", options, null).execute();
} catch (ApiException e) {
// if already deleted
if (e.getCode() == HTTP_NOT_FOUND) {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 2a47da6..d98a161 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -103,6 +103,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
private String narExtractionDirectory;
private String functionInstanceClassPath;
private String downloadDirectory;
+ private int gracePeriodSeconds;
@ToString.Exclude
@EqualsAndHashCode.Exclude
@@ -200,6 +201,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
this.percentMemoryPadding = factoryConfig.getPercentMemoryPadding();
this.cpuOverCommitRatio = factoryConfig.getCpuOverCommitRatio();
this.memoryOverCommitRatio = factoryConfig.getMemoryOverCommitRatio();
+ this.gracePeriodSeconds = factoryConfig.getGracePeriodSeconds();
this.pulsarServiceUrl =
StringUtils.isEmpty(factoryConfig.getPulsarServiceUrl())
? workerConfig.getPulsarServiceUrl() :
factoryConfig.getPulsarServiceUrl();
this.pulsarAdminUrl =
StringUtils.isEmpty(factoryConfig.getPulsarAdminUrl())
@@ -318,6 +320,7 @@ public class KubernetesRuntimeFactory implements
RuntimeFactory {
percentMemoryPadding,
cpuOverCommitRatio,
memoryOverCommitRatio,
+ gracePeriodSeconds,
authProvider,
authenticationEnabled,
grpcPort,
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index a3ef418..e3b758f 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -161,5 +161,11 @@ public class KubernetesRuntimeFactoryConfig {
doc = "The classpath where function instance files stored"
)
private String functionInstanceClassPath = "";
+ @FieldContext(
+ doc = "The duration in seconds before the StatefulSet deleted on
function stop/restart. " +
+ "Value must be non-negative integer. The value zero
indicates delete immediately. " +
+ "Default is 5 seconds."
+ )
+ protected int gracePeriodSeconds = 5;
}
diff --git a/site2/docs/functions-runtime.md b/site2/docs/functions-runtime.md
index 0155f17..09eb1ef 100644
--- a/site2/docs/functions-runtime.md
+++ b/site2/docs/functions-runtime.md
@@ -136,6 +136,10 @@ functionRuntimeFactoryConfigs:
extraFunctionDependenciesDir:
# Additional memory padding added on top of the memory requested by the
function per on a per instance basis
percentMemoryPadding: 10
+ # The duration (in seconds) before the StatefulSet is deleted after a
function stops or restarts.
+ # Value must be a non-negative integer. 0 indicates the StatefulSet is
deleted immediately.
+ # Default is 5 seconds.
+ gracePeriodSeconds: 5
```
If you run functions worker embedded in a broker on Kubernetes, you can use
the default settings.