[
https://issues.apache.org/jira/browse/NIFI-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15248016#comment-15248016
]
ASF GitHub Bot commented on NIFI-1296:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/345#discussion_r60256873
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
---
@@ -0,0 +1,254 @@
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.FormatUtils;
+
+abstract class AbstractKafkaProcessor<T extends Closeable> extends
AbstractSessionFactoryProcessor {
+
+ private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
+
+ private static final String BROKER_REGEX = SINGLE_BROKER_REGEX +
"(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+
+
+ static final AllowableValue SEC_PLAINTEXT = new
AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
+ static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL",
"SSL");
+ static final AllowableValue SEC_SASL_PLAINTEXT = new
AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
+ static final AllowableValue SEC_SASL_SSL = new
AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
+
+ static final PropertyDescriptor BOOTSTRAP_SERVERS = new
PropertyDescriptor.Builder()
+ .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
+ .displayName("Kafka Brokers")
+ .description("A comma-separated list of known Kafka Brokers in
the format <host>:<port>")
+ .required(true)
+
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+ .expressionLanguageSupported(false)
--- End diff --
Also wondering if we should support EL here for the same reason I mentioned
in the other comment. Seems like the brokers and the topic would be the likely
things to change across dev/prod environments.
> Add capabilities to Kafka NAR to use new Kafka API (0.9)
> --------------------------------------------------------
>
> Key: NIFI-1296
> URL: https://issues.apache.org/jira/browse/NIFI-1296
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Affects Versions: 0.4.0
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Fix For: 0.7.0
>
>
> Not sure when can we address this, but the interesting comment in
> https://github.com/apache/nifi/pull/143. The usage of new API may introduce
> issues with running against older Kafka brokers (e.g., 0.8). Need to
> investigate.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)