Github user david-streamlio commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2882#discussion_r207644839
--- Diff:
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java
---
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+public class StandardPulsarClientService extends AbstractControllerService
implements PulsarClientService {
+
+ public static final PropertyDescriptor PULSAR_SERVICE_URL = new
PropertyDescriptor
+ .Builder().name("PULSAR_SERVICE_URL")
+ .displayName("Pulsar Service URL")
+ .description("URL for the Pulsar cluster, e.g localhost:6650")
+ .required(true)
+ .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION =
new PropertyDescriptor.Builder()
+ .name("Allow TLS insecure connection")
+ .displayName("Allow TLS insecure connection")
+ .description("Whether the Pulsar client will accept untrusted
TLS certificate from broker or not.")
+ .required(false)
+ .allowableValues("true", "false")
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("false")
+ .build();
+
+ public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS =
new PropertyDescriptor.Builder()
+ .name("Maximum concurrent lookup-requests")
+ .description("Number of concurrent lookup-requests allowed on
each broker-connection.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("5000")
+ .build();
+
+ public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new
PropertyDescriptor.Builder()
+ .name("Maximum connects per Pulsar broker")
+ .description("Sets the max number of connection that the
client library will open to a single broker.\n" +
+ "By default, the connection pool will use a single
connection for all the producers and consumers. " +
+ "Increasing this parameter may improve throughput when
using many producers over a high latency connection")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor IO_THREADS = new
PropertyDescriptor.Builder()
+ .name("I/O Threads")
+ .description("The number of threads to be used for handling
connections to brokers.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor KEEP_ALIVE_INTERVAL = new
PropertyDescriptor.Builder()
+ .name("Keep Alive interval")
+ .displayName("Keep Alive interval")
+ .description("The keep alive interval in seconds for each
client-broker-connection.")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("30 sec")
+ .build();
+
+ public static final PropertyDescriptor LISTENER_THREADS = new
PropertyDescriptor.Builder()
+ .name("Listener Threads")
+ .description("The number of threads to be used for message
listeners")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor MAXIMUM_LOOKUP_REQUESTS = new
PropertyDescriptor.Builder()
+ .name("Maximum lookup requests")
+ .description("Number of max lookup-requests allowed on each
broker-connection. To prevent overload on broker,"
+ + "it should be greater than the 'Maximum concurrent
lookup-requests' property value.")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("50000")
+ .build();
+
+ public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new
PropertyDescriptor.Builder()
+ .name("Maximum rejected requests per connection")
+ .description("Max number of broker-rejected requests in a
certain time-frame after " +
+ "which current connection will be closed and client
creates a new connection that give " +
+ "chance to connect a different broker")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("50")
+ .build();
+
+ public static final PropertyDescriptor OPERATION_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Operation Timeout")
+ .description("Producer-create, subscribe and unsubscribe
operations will be retried until this " +
+ "interval, after which the operation will be marked as
failed")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("30 sec")
+ .build();
+
+ public static final PropertyDescriptor STATS_INTERVAL = new
PropertyDescriptor.Builder()
+ .name("Stats interval")
+ .description("The interval between each stat infomation
update. It should be set to at least 1 second")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("60 sec")
+ .build();
+
+ public static final PropertyDescriptor TLS_TRUST_CERTS_FILE_PATH = new
PropertyDescriptor.Builder()
--- End diff --
The SSL Context service only encapsulates two value, a KeyStore file and a
TrustStore file, which I use to configure the broker endpoint. But there is
also another property used to set the path to the trusted TLS certificate file
(for the X509 certificate, etc) the broker uses, so I had to expose that
setting in a separate property.
https://pulsar.incubator.apache.org/api/client/ has the detailed
explanations for these properties.
---