This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8c3eb34 [conf] Add annotations for documenting function worker
configuration settings (#3117)
8c3eb34 is described below
commit 8c3eb3450a64af0014c77b815635c19ed8f84cc0
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Dec 21 00:18:55 2018 +0800
[conf] Add annotations for documenting function worker configuration
settings (#3117)
*Motivation*
This change is adding annotations to function worker configuration for
generating function worker configuration file.
---
.../pulsar/functions/worker/WorkerConfig.java | 287 +++++++++++++++++++--
1 file changed, 268 insertions(+), 19 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index d49722f..d37d93a 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -33,6 +33,8 @@ import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.common.configuration.Category;
+import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
import lombok.Data;
@@ -53,54 +55,213 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
private static final long serialVersionUID = 1L;
+ @Category
+ private static final String CATEGORY_WORKER = "Worker Settings";
+ @Category
+ private static final String CATEGORY_FUNC_PKG = "Function Package
Management";
+ @Category
+ private static final String CATEGORY_FUNC_METADATA_MNG = "Function
Metadata Management";
+ @Category
+ private static final String CATEGORY_FUNC_RUNTIME_MNG = "Function Runtime
Management";
+ @Category
+ private static final String CATEGORY_SECURITY = "Common Security Settings
(applied for both worker and client)";
+ @Category
+ private static final String CATEGORY_WORKER_SECURITY = "Worker Security
Settings";
+ @Category
+ private static final String CATEGORY_CLIENT_SECURITY = "Security settings
for clients talking to brokers";
+ @Category
+ private static final String CATEGORY_STATE = "State Management";
+ @Category
+ private static final String CATEGORY_CONNECTORS = "Connectors";
+
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Id to identify a worker instance"
+ )
private String workerId;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "Hostname of the worker instance"
+ )
private String workerHostname;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "The port for serving worker http requests"
+ )
private Integer workerPort;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "The port for serving worker https requests"
+ )
private Integer workerPortTls;
+ @FieldContext(
+ category = CATEGORY_CONNECTORS,
+ doc = "The path to the location to locate builtin connectors"
+ )
private String connectorsDirectory = "./connectors";
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The pulsar topic used for storing function metadata"
+ )
private String functionMetadataTopicName;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The web service url for function workers"
+ )
private String functionWebServiceUrl;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The pulser binary service url that function metadata manager
talks to"
+ )
private String pulsarServiceUrl;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The pulsar web service url that function metadata manager talks
to"
+ )
private String pulsarWebServiceUrl;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The pulsar topic used for cluster coordination"
+ )
private String clusterCoordinationTopicName;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The pulsar namespace for storing metadata topics"
+ )
private String pulsarFunctionsNamespace;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The pulsar cluster name. Used for creating pulsar namespace
during worker initialization"
+ )
private String pulsarFunctionsCluster;
+ @FieldContext(
+ category = CATEGORY_FUNC_PKG,
+ doc = "The number of replicas for storing functions"
+ )
private int numFunctionPackageReplicas;
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "The directory to download functions by runtime manager"
+ )
private String downloadDirectory;
+ @FieldContext(
+ category = CATEGORY_STATE,
+ doc = "The service url of state storage"
+ )
private String stateStorageServiceUrl;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The pulsar topic used for storing function assignment
informations"
+ )
private String functionAssignmentTopicName;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The scheduler class used by assigning functions to workers"
+ )
private String schedulerClassName;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The frequency of failure checks, in milliseconds"
+ )
private long failureCheckFreqMs;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The reschedule timeout of function assignment, in milliseconds"
+ )
private long rescheduleTimeoutMs;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The max number of retries for initial broker reconnects when
function metadata manager"
+ + " tries to create producer on metadata topics"
+ )
private int initialBrokerReconnectMaxRetries;
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "The max number of retries for writing assignment to assignment
topic"
+ )
private int assignmentWriteMaxRetries;
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "The frequency of instance liveness check, in milliseconds"
+ )
private long instanceLivenessCheckFreqMs;
+ @FieldContext(
+ category = CATEGORY_CLIENT_SECURITY,
+ doc = "The authentication plugin used by function workers to talk to
brokers"
+ )
private String clientAuthenticationPlugin;
+ @FieldContext(
+ category = CATEGORY_CLIENT_SECURITY,
+ doc = "The parameters of the authentication plugin used by function
workers to talk to brokers"
+ )
private String clientAuthenticationParameters;
- // Frequency how often worker performs compaction on function-topics
+ @FieldContext(
+ category = CATEGORY_FUNC_METADATA_MNG,
+ doc = "Frequency how often worker performs compaction on
function-topics, in seconds"
+ )
private long topicCompactionFrequencySec = 30 * 60; // 30 minutes
/***** --- TLS --- ****/
+ @FieldContext(
+ category = CATEGORY_WORKER_SECURITY,
+ doc = "Enable TLS"
+ )
@Deprecated
private boolean tlsEnabled = false;
- // Path for the TLS certificate file
+ @FieldContext(
+ category = CATEGORY_WORKER_SECURITY,
+ doc = "Path for the TLS certificate file"
+ )
private String tlsCertificateFilePath;
- // Path for the TLS private key file
+ @FieldContext(
+ category = CATEGORY_WORKER_SECURITY,
+ doc = "Path for the TLS private key file"
+ )
private String tlsKeyFilePath;
- // Path for the trusted TLS certificate file
+ @FieldContext(
+ category = CATEGORY_SECURITY,
+ doc = "Path for the trusted TLS certificate file"
+ )
private String tlsTrustCertsFilePath = "";
- // Accept untrusted TLS certificate from client
+ @FieldContext(
+ category = CATEGORY_SECURITY,
+ doc = "Accept untrusted TLS certificate from client"
+ )
private boolean tlsAllowInsecureConnection = false;
+ @FieldContext(
+ category = CATEGORY_WORKER_SECURITY,
+ doc = "Require trusted client cert on connect"
+ )
private boolean tlsRequireTrustedClientCertOnConnect = false;
+ @FieldContext(
+ category = CATEGORY_CLIENT_SECURITY,
+ doc = "Whether to enable TLS when clients connect to broker"
+ )
// TLS for Functions -> Broker
private boolean useTls = false;
+ @FieldContext(
+ category = CATEGORY_SECURITY,
+ doc = "Whether to enable hostname verification on TLS connections"
+ )
private boolean tlsHostnameVerificationEnable = false;
- // Enforce authentication
+ @FieldContext(
+ category = CATEGORY_WORKER_SECURITY,
+ doc = "Enforce authentication"
+ )
private boolean authenticationEnabled = false;
- // Autentication provider name list, which is a list of class names
+ @FieldContext(
+ category = CATEGORY_WORKER_SECURITY,
+ doc = "Autentication provider name list, which is a list of class
names"
+ )
private Set<String> authenticationProviders = Sets.newTreeSet();
- // Enforce authorization on accessing functions admin-api
+ @FieldContext(
+ category = CATEGORY_WORKER_SECURITY,
+ doc = "Enforce authorization on accessing functions admin-api"
+ )
private boolean authorizationEnabled = false;
- // Role names that are treated as "super-user", meaning they will be able
to access any admin-api
+ @FieldContext(
+ category = CATEGORY_WORKER_SECURITY,
+ doc = "Role names that are treated as `super-user`, meaning they will
be able to access any admin-api"
+ )
private Set<String> superUserRoles = Sets.newTreeSet();
private Properties properties = new Properties();
@@ -116,8 +277,15 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
@EqualsAndHashCode
@ToString
public static class ThreadContainerFactory {
+ @FieldContext(
+ doc = "The name of thread group running function threads"
+ )
private String threadGroupName;
}
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "Thread based runtime settings"
+ )
private ThreadContainerFactory threadContainerFactory;
@Data
@@ -126,12 +294,29 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
@EqualsAndHashCode
@ToString
public static class ProcessContainerFactory {
+ @FieldContext(
+ doc = "The path to the java instance. Change the jar location only
when you put"
+ + " the java instance jar in a different location"
+ )
private String javaInstanceJarLocation;
+ @FieldContext(
+ doc = "The path to the python instance. Change the python instance
location only"
+ + " when you put the python instance in a different location"
+ )
private String pythonInstanceLocation;
+ @FieldContext(
+ doc = "The path to the log directory"
+ )
private String logDirectory;
- // the directory for dropping extra function dependencies
+ @FieldContext(
+ doc = "the directory for dropping extra function dependencies"
+ )
private String extraFunctionDependenciesDir;
}
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "Process based runtime settings"
+ )
private ProcessContainerFactory processContainerFactory;
@Data
@@ -140,33 +325,97 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
@EqualsAndHashCode
@ToString
public static class KubernetesContainerFactory {
+ @FieldContext(
+ doc = "Uri to kubernetes cluster, leave it to empty and it will
use the kubernetes settings in"
+ + " function worker machine"
+ )
private String k8Uri;
+ @FieldContext(
+ doc = "The Kubernetes namespace to run the function instances. It
is `default`,"
+ + " if this setting is left to be empty"
+ )
private String jobNamespace;
+ @FieldContext(
+ doc = "The docker image used to run function instance. By default
it is `apachepulsar/pulsar`"
+ )
private String pulsarDockerImageName;
+ @FieldContext(
+ doc = "The root directory of pulsar home directory in the pulsar
docker image specified"
+ + " `pulsarDockerImageName`. By default it is under `/pulsar`.
If you are using your own"
+ + " customized image in `pulsarDockerImageName`, you need to
set this setting accordingly"
+ )
private String pulsarRootDir;
+ @FieldContext(
+ doc = "This setting only takes effects if `k8Uri` is set to null.
If your function worker is"
+ + " also running as a k8s pod, set this to `true` is let
function worker to submit functions to"
+ + " the same k8s cluster as function worker is running. Set
this to `false` if your function worker"
+ + " is not running as a k8s pod"
+ )
private Boolean submittingInsidePod;
+ @FieldContext(
+ doc = "The pulsar service url that pulsar functions should use to
connect to pulsar."
+ + " If it is not set, it will use the pulsar service url
configured in function worker."
+ )
private String pulsarServiceUrl;
+ @FieldContext(
+ doc = "The pulsar admin url that pulsar functions should use to
connect to pulsar."
+ + " If it is not set, it will use the pulsar admin url
configured in function worker."
+ )
private String pulsarAdminUrl;
+ @FieldContext(
+ doc = "The flag indicates to install user code dependencies.
(applied to python package)"
+ )
private Boolean installUserCodeDependencies;
+ @FieldContext(
+ doc = "The repository that pulsar functions use to download python
dependencies"
+ )
private String pythonDependencyRepository;
+ @FieldContext(
+ doc = "The repository that pulsar functions use to download extra
python dependencies"
+ )
private String pythonExtraDependencyRepository;
- // the directory for dropping extra function dependencies.
- // If it is not absolute path, it is relative to `pulsarRootDir`
+
+ @FieldContext(
+ doc = "the directory for dropping extra function dependencies. "
+ + "If it is not absolute path, it is relative to
`pulsarRootDir`"
+ )
private String extraFunctionDependenciesDir;
+ @FieldContext(
+ doc = "The custom labels that function worker uses to select the
nodes for pods"
+ )
private Map<String, String> customLabels;
+
+ @FieldContext(
+ doc = "The expected metrics collection interval, in seconds"
+ )
private Integer expectedMetricsCollectionInterval = 30;
- // Kubernetes Runtime will periodically checkback on
- // this configMap if defined and if there are any changes
- // to the kubernetes specific stuff, we apply those changes
+ @FieldContext(
+ doc = "Kubernetes Runtime will periodically checkback on"
+ + " this configMap if defined and if there are any changes"
+ + " to the kubernetes specific stuff, we apply those changes"
+ )
private String changeConfigMap;
+ @FieldContext(
+ doc = "The namespace for storing change config map"
+ )
private String changeConfigMapNamespace;
}
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "Kubernetes based runtime settings"
+ )
private KubernetesContainerFactory kubernetesContainerFactory;
- // The classname of the secrets provider configurator.
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "The classname of the secrets provider configurator."
+ )
private String secretsProviderConfiguratorClassName;
- // Any config the secret provider configurator might need. This is passed
on
- // to the init method of the secretproviderconfigurator
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "Any config the secret provider configurator might need.
\n\nThis is passed on"
+ + " to the init method of the secretproviderconfigurator"
+ )
private Map<String, String> secretsProviderConfiguratorConfig;
public String getFunctionMetadataTopic() {
@@ -216,4 +465,4 @@ public class WorkerConfig implements Serializable,
PulsarConfiguration {
public void setProperties(Properties properties) {
this.properties = properties;
}
-}
\ No newline at end of file
+}