[ 
https://issues.apache.org/jira/browse/NIFI-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15248016#comment-15248016
 ] 

ASF GitHub Bot commented on NIFI-1296:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/345#discussion_r60256873
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
 ---
    @@ -0,0 +1,254 @@
    +package org.apache.nifi.processors.kafka.pubsub;
    +
    +import java.io.Closeable;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.regex.Pattern;
    +
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyDescriptor.Builder;
    +import org.apache.nifi.components.ValidationContext;
    +import org.apache.nifi.components.ValidationResult;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.util.FormatUtils;
    +
    +abstract class AbstractKafkaProcessor<T extends Closeable> extends 
AbstractSessionFactoryProcessor {
    +
    +    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
    +
    +    private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + 
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
    +
    +
    +    static final AllowableValue SEC_PLAINTEXT = new 
AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
    +    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", 
"SSL");
    +    static final AllowableValue SEC_SASL_PLAINTEXT = new 
AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
    +    static final AllowableValue SEC_SASL_SSL = new 
AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
    +
    +    static final PropertyDescriptor BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
    +            .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
    +            .displayName("Kafka Brokers")
    +            .description("A comma-separated list of known Kafka Brokers in 
the format <host>:<port>")
    +            .required(true)
    +            
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
    +            .expressionLanguageSupported(false)
    --- End diff --
    
    Also wondering if we should support EL here for the same reason I mentioned 
in the other comment. Seems like the brokers and the topic would be the likely 
things to change across dev/prod environments.


> Add capabilities to Kafka NAR to use new Kafka API (0.9)
> --------------------------------------------------------
>
>                 Key: NIFI-1296
>                 URL: https://issues.apache.org/jira/browse/NIFI-1296
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 0.4.0
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> Not sure when can we address this, but the interesting comment in 
> https://github.com/apache/nifi/pull/143. The usage of new API may introduce 
> issues with running against older Kafka brokers (e.g., 0.8). Need to 
> investigate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to