[ 
https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568649#comment-16568649
 ] 

ASF GitHub Bot commented on NIFI-4914:
--------------------------------------

Github user david-streamlio commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2882#discussion_r207644839
  
    --- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java
 ---
    @@ -0,0 +1,285 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.pulsar;
    +
    +import java.net.MalformedURLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.apache.nifi.annotation.lifecycle.OnEnabled;
    +import org.apache.nifi.annotation.lifecycle.OnShutdown;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.controller.ConfigurationContext;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.ssl.SSLContextService;
    +import org.apache.pulsar.client.api.ClientBuilder;
    +import org.apache.pulsar.client.api.PulsarClient;
    +import org.apache.pulsar.client.api.PulsarClientException;
    +import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
    +import org.apache.pulsar.client.impl.auth.AuthenticationTls;
    +
    +public class StandardPulsarClientService extends AbstractControllerService 
implements PulsarClientService {
    +
    +    public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
    +            .Builder().name("PULSAR_SERVICE_URL")
    +            .displayName("Pulsar Service URL")
    +            .description("URL for the Pulsar cluster, e.g localhost:6650")
    +            .required(true)
    +            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = 
new PropertyDescriptor.Builder()
    +            .name("Allow TLS insecure connection")
    +            .displayName("Allow TLS insecure connection")
    +            .description("Whether the Pulsar client will accept untrusted 
TLS certificate from broker or not.")
    +            .required(false)
    +            .allowableValues("true", "false")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("false")
    +            .build();
    +
    +    public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
    +            .name("Maximum concurrent lookup-requests")
    +            .description("Number of concurrent lookup-requests allowed on 
each broker-connection.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("5000")
    +            .build();
    +
    +    public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
    +            .name("Maximum connects per Pulsar broker")
    +            .description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
    +                    "By default, the connection pool will use a single 
connection for all the producers and consumers. " +
    +                    "Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
    +            .name("I/O Threads")
    +            .description("The number of threads to be used for handling 
connections to brokers.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor KEEP_ALIVE_INTERVAL = new 
PropertyDescriptor.Builder()
    +            .name("Keep Alive interval")
    +            .displayName("Keep Alive interval")
    +            .description("The keep alive interval in seconds for each 
client-broker-connection.")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("30 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor LISTENER_THREADS = new 
PropertyDescriptor.Builder()
    +            .name("Listener Threads")
    +            .description("The number of threads to be used for message 
listeners")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("1")
    +            .build();
    +
    +    public static final PropertyDescriptor MAXIMUM_LOOKUP_REQUESTS = new 
PropertyDescriptor.Builder()
    +            .name("Maximum lookup requests")
    +            .description("Number of max lookup-requests allowed on each 
broker-connection. To prevent overload on broker,"
    +                    + "it should be greater than the 'Maximum concurrent 
lookup-requests' property value.")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .defaultValue("50000")
    +            .build();
    +
    +    public static final PropertyDescriptor MAXIMUM_REJECTED_REQUESTS = new 
PropertyDescriptor.Builder()
    +            .name("Maximum rejected requests per connection")
    +            .description("Max number of broker-rejected requests in a 
certain time-frame after " +
    +                "which current connection will be closed and client 
creates a new connection that give " +
    +                "chance to connect a different broker")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("50")
    +            .build();
    +
    +    public static final PropertyDescriptor OPERATION_TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Operation Timeout")
    +            .description("Producer-create, subscribe and unsubscribe 
operations will be retried until this " +
    +                "interval, after which the operation will be marked as 
failed")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .defaultValue("30 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor STATS_INTERVAL = new 
PropertyDescriptor.Builder()
    +            .name("Stats interval")
    +            .description("The interval between each stat infomation 
update. It should be set to at least 1 second")
    +            .required(false)
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .defaultValue("60 sec")
    +            .build();
    +
    +    public static final PropertyDescriptor TLS_TRUST_CERTS_FILE_PATH = new 
PropertyDescriptor.Builder()
    --- End diff --
    
    The SSL Context service only encapsulates two value, a KeyStore file and a 
TrustStore file, which I use to configure the broker endpoint. But there is 
also another property used to set the path to the trusted TLS certificate file 
(for the X509 certificate, etc) the broker uses, so I had to expose that 
setting in a separate property.
    
    https://pulsar.incubator.apache.org/api/client/ has the detailed 
explanations for these properties. 


> Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, 
> PublishPulsarRecord
> ------------------------------------------------------------------------------------------
>
>                 Key: NIFI-4914
>                 URL: https://issues.apache.org/jira/browse/NIFI-4914
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.6.0
>            Reporter: David Kjerrumgaard
>            Priority: Minor
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Create record-based processors for Apache Pulsar 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to