Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2702#discussion_r197445797
--- Diff:
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java
---
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+public class StandardPulsarClientService extends AbstractControllerService
implements PulsarClientService {
+
+ public static final PropertyDescriptor PULSAR_SERVICE_URL = new
PropertyDescriptor
+ .Builder().name("PULSAR_SERVICE_URL")
+ .displayName("Pulsar Service URL")
+ .description("URL for the Pulsar cluster, e.g localhost:6650")
+ .required(true)
+ .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION =
new PropertyDescriptor.Builder()
+ .name("Allow TLS insecure conneciton")
+ .displayName("Allow TLS insecure conneciton")
+ .description("Whether the Pulsar client will accept untrusted
TLS certificate from broker or not.")
+ .required(false)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue(Boolean.FALSE.toString())
+ .build();
+
+ public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS =
new PropertyDescriptor.Builder()
+ .name("Maximum concurrent lookup-requests")
+ .description("Number of concurrent lookup-requests allowed on
each broker-connection to prevent "
+ + "overload on broker. (default: 5000) It should be
configured with higher value only in case "
+ + "of it requires to produce/subscribe on thousands of
topics")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("5000")
+ .build();
+
+ public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new
PropertyDescriptor.Builder()
+ .name("Maximum connects per Pulsar broker")
+ .description("Sets the max number of connection that the
client library will open to a single broker.\n" +
+ "By default, the connection pool will use a single
connection for all the producers and consumers. " +
+ "Increasing this parameter may improve throughput when
using many producers over a high latency connection")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor IO_THREADS = new
PropertyDescriptor.Builder()
+ .name("I/O Threads")
+ .description("The number of threads to be used for handling
connections to brokers (default: 1 thread)")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor KEEP_ALIVE_INTERVAL = new
PropertyDescriptor.Builder()
+ .name("Keep Alive interval")
+ .displayName("Keep Alive interval")
+ .description("The keep alive interval in seconds for each
client-broker-connection. (default: 30).")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("30")
+ .build();
+
+ public static final PropertyDescriptor LISTENER_THREADS = new
PropertyDescriptor.Builder()
+ .name("Listener Threads")
+ .description("The number of threads to be used for message
listeners (default: 1 thread)")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("1")
+ .build();
+
+ public static final PropertyDescriptor MAXIMUM_LOOKUP_REQUESTS = new
PropertyDescriptor.Builder()
+ .name("Maximum lookup requests")
+ .description("Number of max lookup-requests allowed on each
broker-connection to prevent overload on broker."
+ + "(default: 50000) It should be bigger than
maxConcurrentLookupRequests. ")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .defaultValue("50000")
+ .build();
+
+ public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new
PropertyDescriptor.Builder()
+ .name("Maximum rejected requests per connection")
+ .description("Max number of broker-rejected requests in a
certain time-frame (30 seconds) after " +
+ "which current connection will be closed and client
creates a new connection that give " +
+ "chance to connect a different broker (default: 50)")
+ .required(false)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("50")
+ .build();
+
+ public static final PropertyDescriptor OPERATION_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Operation Timeout")
+ .description("Producer-create, subscribe and unsubscribe
operations will be retried until this " +
+ "interval, after which the operation will be maked as
failed (default: 30 seconds)")
--- End diff --
Typo: "maked" should be "marked"
---