http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java new file mode 100644 index 0000000..a7bd96d --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +/** + * A pool of Kafka Consumers for a given topic. Consumers can be obtained by + * calling 'obtainConsumer'. Once closed the pool is ready to be immediately + * used again. + */ +public class ConsumerPool implements Closeable { + + private final BlockingQueue<SimpleConsumerLease> pooledLeases; + private final List<String> topics; + private final Pattern topicPattern; + private final Map<String, Object> kafkaProperties; + private final long maxWaitMillis; + private final ComponentLog logger; + private final byte[] demarcatorBytes; + private final String keyEncoding; + private final String securityProtocol; + private final String bootstrapServers; + private final boolean honorTransactions; + private final RecordReaderFactory readerFactory; + private final RecordSetWriterFactory writerFactory; + private final Charset headerCharacterSet; + private final Pattern headerNamePattern; + private final AtomicLong consumerCreatedCountRef = new AtomicLong(); + private final AtomicLong consumerClosedCountRef = new AtomicLong(); + private final AtomicLong leasesObtainedCountRef = new AtomicLong(); + + /** + * Creates a pool of KafkaConsumer objects that will grow up to the maximum + * indicated threads from the given context. Consumers are lazily + * initialized. We may elect to not create up to the maximum number of + * configured consumers if the broker reported lag time for all topics is + * below a certain threshold. + * + * @param maxConcurrentLeases max allowable consumers at once + * @param demarcator bytes to use as demarcator between messages; null or + * empty means no demarcator + * @param kafkaProperties properties to use to initialize kafka consumers + * @param topics the topics to subscribe to + * @param maxWaitMillis maximum time to wait for a given lease to acquire + * data before committing + * @param keyEncoding the encoding to use for the key of a kafka message if + * found + * @param securityProtocol the security protocol used + * @param bootstrapServers the bootstrap servers + * @param logger the logger to report any errors/warnings + */ + public ConsumerPool( + final int maxConcurrentLeases, + final byte[] demarcator, + final Map<String, Object> kafkaProperties, + final List<String> topics, + final long maxWaitMillis, + final String keyEncoding, + final String securityProtocol, + final String bootstrapServers, + final ComponentLog logger, + final boolean honorTransactions, + final Charset headerCharacterSet, + final Pattern headerNamePattern) { + this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.maxWaitMillis = maxWaitMillis; + this.logger = logger; + this.demarcatorBytes = demarcator; + this.keyEncoding = keyEncoding; + this.securityProtocol = securityProtocol; + this.bootstrapServers = bootstrapServers; + this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); + this.topics = Collections.unmodifiableList(topics); + this.topicPattern = null; + this.readerFactory = null; + this.writerFactory = null; + this.honorTransactions = honorTransactions; + this.headerCharacterSet = headerCharacterSet; + this.headerNamePattern = headerNamePattern; + } + + public ConsumerPool( + final int maxConcurrentLeases, + final byte[] demarcator, + final Map<String, Object> kafkaProperties, + final Pattern topics, + final long maxWaitMillis, + final String keyEncoding, + final String securityProtocol, + final String bootstrapServers, + final ComponentLog logger, + final boolean honorTransactions, + final Charset headerCharacterSet, + final Pattern headerNamePattern) { + this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.maxWaitMillis = maxWaitMillis; + this.logger = logger; + this.demarcatorBytes = demarcator; + this.keyEncoding = keyEncoding; + this.securityProtocol = securityProtocol; + this.bootstrapServers = bootstrapServers; + this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); + this.topics = null; + this.topicPattern = topics; + this.readerFactory = null; + this.writerFactory = null; + this.honorTransactions = honorTransactions; + this.headerCharacterSet = headerCharacterSet; + this.headerNamePattern = headerNamePattern; + } + + public ConsumerPool( + final int maxConcurrentLeases, + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, + final Map<String, Object> kafkaProperties, + final Pattern topics, + final long maxWaitMillis, + final String securityProtocol, + final String bootstrapServers, + final ComponentLog logger, + final boolean honorTransactions, + final Charset headerCharacterSet, + final Pattern headerNamePattern) { + this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.maxWaitMillis = maxWaitMillis; + this.logger = logger; + this.demarcatorBytes = null; + this.keyEncoding = null; + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.securityProtocol = securityProtocol; + this.bootstrapServers = bootstrapServers; + this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); + this.topics = null; + this.topicPattern = topics; + this.honorTransactions = honorTransactions; + this.headerCharacterSet = headerCharacterSet; + this.headerNamePattern = headerNamePattern; + } + + public ConsumerPool( + final int maxConcurrentLeases, + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, + final Map<String, Object> kafkaProperties, + final List<String> topics, + final long maxWaitMillis, + final String securityProtocol, + final String bootstrapServers, + final ComponentLog logger, + final boolean honorTransactions, + final Charset headerCharacterSet, + final Pattern headerNamePattern) { + this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.maxWaitMillis = maxWaitMillis; + this.logger = logger; + this.demarcatorBytes = null; + this.keyEncoding = null; + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.securityProtocol = securityProtocol; + this.bootstrapServers = bootstrapServers; + this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); + this.topics = topics; + this.topicPattern = null; + this.honorTransactions = honorTransactions; + this.headerCharacterSet = headerCharacterSet; + this.headerNamePattern = headerNamePattern; + } + + /** + * Obtains a consumer from the pool if one is available or lazily + * initializes a new one if deemed necessary. + * + * @param session the session for which the consumer lease will be + * associated + * @param processContext the ProcessContext for which the consumer + * lease will be associated + * @return consumer to use or null if not available or necessary + */ + public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) { + SimpleConsumerLease lease = pooledLeases.poll(); + if (lease == null) { + final Consumer<byte[], byte[]> consumer = createKafkaConsumer(); + consumerCreatedCountRef.incrementAndGet(); + /** + * For now return a new consumer lease. But we could later elect to + * have this return null if we determine the broker indicates that + * the lag time on all topics being monitored is sufficiently low. + * For now we should encourage conservative use of threads because + * having too many means we'll have at best useless threads sitting + * around doing frequent network calls and at worst having consumers + * sitting idle which could prompt excessive rebalances. + */ + lease = new SimpleConsumerLease(consumer); + /** + * This subscription tightly couples the lease to the given + * consumer. They cannot be separated from then on. + */ + if (topics != null) { + consumer.subscribe(topics, lease); + } else { + consumer.subscribe(topicPattern, lease); + } + } + lease.setProcessSession(session, processContext); + + leasesObtainedCountRef.incrementAndGet(); + return lease; + } + + /** + * Exposed as protected method for easier unit testing + * + * @return consumer + * @throws KafkaException if unable to subscribe to the given topics + */ + protected Consumer<byte[], byte[]> createKafkaConsumer() { + final Map<String, Object> properties = new HashMap<>(kafkaProperties); + if (honorTransactions) { + properties.put("isolation.level", "read_committed"); + } else { + properties.put("isolation.level", "read_uncommitted"); + } + final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties); + return consumer; + } + + /** + * Closes all consumers in the pool. Can be safely called repeatedly. + */ + @Override + public void close() { + final List<SimpleConsumerLease> leases = new ArrayList<>(); + pooledLeases.drainTo(leases); + leases.stream().forEach((lease) -> { + lease.close(true); + }); + } + + private void closeConsumer(final Consumer<?, ?> consumer) { + consumerClosedCountRef.incrementAndGet(); + try { + consumer.unsubscribe(); + } catch (Exception e) { + logger.warn("Failed while unsubscribing " + consumer, e); + } + + try { + consumer.close(); + } catch (Exception e) { + logger.warn("Failed while closing " + consumer, e); + } + } + + PoolStats getPoolStats() { + return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get()); + } + + private class SimpleConsumerLease extends ConsumerLease { + + private final Consumer<byte[], byte[]> consumer; + private volatile ProcessSession session; + private volatile ProcessContext processContext; + private volatile boolean closedConsumer; + + private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) { + super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, + readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern); + this.consumer = consumer; + } + + void setProcessSession(final ProcessSession session, final ProcessContext context) { + this.session = session; + this.processContext = context; + } + + @Override + public void yield() { + if (processContext != null) { + processContext.yield(); + } + } + + @Override + public ProcessSession getProcessSession() { + return session; + } + + @Override + public void close() { + super.close(); + close(false); + } + + public void close(final boolean forceClose) { + if (closedConsumer) { + return; + } + super.close(); + if (session != null) { + session.rollback(); + setProcessSession(null, null); + } + if (forceClose || isPoisoned() || !pooledLeases.offer(this)) { + closedConsumer = true; + closeConsumer(consumer); + } + } + } + + static final class PoolStats { + + final long consumerCreatedCount; + final long consumerClosedCount; + final long leasesObtainedCount; + + PoolStats( + final long consumerCreatedCount, + final long consumerClosedCount, + final long leasesObtainedCount + ) { + this.consumerCreatedCount = consumerCreatedCount; + this.consumerClosedCount = consumerClosedCount; + this.leasesObtainedCount = leasesObtainedCount; + } + + @Override + public String toString() { + return "Created Consumers [" + consumerCreatedCount + "]\n" + + "Closed Consumers [" + consumerClosedCount + "]\n" + + "Leases Obtained [" + leasesObtainedCount + "]\n"; + } + + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java new file mode 100644 index 0000000..317b274 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; + +public class InFlightMessageTracker { + private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>(); + private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>(); + private final Object progressMutex = new Object(); + private final ComponentLog logger; + + public InFlightMessageTracker(final ComponentLog logger) { + this.logger = logger; + } + + public void incrementAcknowledgedCount(final FlowFile flowFile) { + final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts()); + counter.incrementAcknowledgedCount(); + + synchronized (progressMutex) { + progressMutex.notify(); + } + } + + public void trackEmpty(final FlowFile flowFile) { + messageCountsByFlowFile.putIfAbsent(flowFile, new Counts()); + } + + public int getAcknowledgedCount(final FlowFile flowFile) { + final Counts counter = messageCountsByFlowFile.get(flowFile); + return (counter == null) ? 0 : counter.getAcknowledgedCount(); + } + + public void incrementSentCount(final FlowFile flowFile) { + final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts()); + counter.incrementSentCount(); + } + + public int getSentCount(final FlowFile flowFile) { + final Counts counter = messageCountsByFlowFile.get(flowFile); + return (counter == null) ? 0 : counter.getSentCount(); + } + + public void fail(final FlowFile flowFile, final Exception exception) { + failures.putIfAbsent(flowFile, exception); + logger.error("Failed to send " + flowFile + " to Kafka", exception); + + synchronized (progressMutex) { + progressMutex.notify(); + } + } + + public Exception getFailure(final FlowFile flowFile) { + return failures.get(flowFile); + } + + public boolean isFailed(final FlowFile flowFile) { + return getFailure(flowFile) != null; + } + + public void reset() { + messageCountsByFlowFile.clear(); + failures.clear(); + } + + public PublishResult failOutstanding(final Exception exception) { + messageCountsByFlowFile.keySet().stream() + .filter(ff -> !isComplete(ff)) + .filter(ff -> !failures.containsKey(ff)) + .forEach(ff -> failures.put(ff, exception)); + + return createPublishResult(); + } + + private boolean isComplete(final FlowFile flowFile) { + final Counts counts = messageCountsByFlowFile.get(flowFile); + if (counts.getAcknowledgedCount() == counts.getSentCount()) { + // all messages received successfully. + return true; + } + + if (failures.containsKey(flowFile)) { + // FlowFile failed so is complete + return true; + } + + return false; + } + + private boolean isComplete() { + return messageCountsByFlowFile.keySet().stream() + .allMatch(flowFile -> isComplete(flowFile)); + } + + void awaitCompletion(final long millis) throws InterruptedException, TimeoutException { + final long startTime = System.nanoTime(); + final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis); + + while (System.nanoTime() < maxTime) { + synchronized (progressMutex) { + if (isComplete()) { + return; + } + + progressMutex.wait(millis); + } + } + + throw new TimeoutException(); + } + + + PublishResult createPublishResult() { + return new PublishResult() { + @Override + public boolean isFailure() { + return !failures.isEmpty(); + } + + @Override + public int getSuccessfulMessageCount(final FlowFile flowFile) { + return getAcknowledgedCount(flowFile); + } + + @Override + public Exception getReasonForFailure(final FlowFile flowFile) { + return getFailure(flowFile); + } + }; + } + + public static class Counts { + private final AtomicInteger sentCount = new AtomicInteger(0); + private final AtomicInteger acknowledgedCount = new AtomicInteger(0); + + public void incrementSentCount() { + sentCount.incrementAndGet(); + } + + public void incrementAcknowledgedCount() { + acknowledgedCount.incrementAndGet(); + } + + public int getAcknowledgedCount() { + return acknowledgedCount.get(); + } + + public int getSentCount() { + return sentCount.get(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java new file mode 100644 index 0000000..de28995 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class KafkaProcessorUtils { + + final Logger logger = LoggerFactory.getLogger(this.getClass()); + + static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); + static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", + "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters"); + + static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+"); + + static final String KAFKA_KEY = "kafka.key"; + static final String KAFKA_TOPIC = "kafka.topic"; + static final String KAFKA_PARTITION = "kafka.partition"; + static final String KAFKA_OFFSET = "kafka.offset"; + static final String KAFKA_COUNT = "kafka.count"; + static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT"); + static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL"); + static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT"); + static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL"); + + static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder() + .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + .displayName("Kafka Brokers") + .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") + .required(true) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("localhost:9092") + .build(); + static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() + .name("security.protocol") + .displayName("Security Protocol") + .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) + .defaultValue(SEC_PLAINTEXT.getValue()) + .build(); + static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder() + .name("sasl.kerberos.service.name") + .displayName("Kerberos Service Name") + .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. " + + "Corresponds to Kafka's 'security.protocol' property." + + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder() + .name("sasl.kerberos.principal") + .displayName("Kerberos Principal") + .description("The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file " + + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder() + .name("sasl.kerberos.keytab") + .displayName("Kerberos Keytab") + .description("The Kerberos keytab that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file " + + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.") + .required(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("ssl.context.service") + .displayName("SSL Context Service") + .description("Specifies the SSL Context Service to use for communicating with Kafka.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + static List<PropertyDescriptor> getCommonPropertyDescriptors() { + return Arrays.asList( + BOOTSTRAP_SERVERS, + SECURITY_PROTOCOL, + KERBEROS_PRINCIPLE, + USER_PRINCIPAL, + USER_KEYTAB, + SSL_CONTEXT_SERVICE + ); + } + + static Collection<ValidationResult> validateCommonProperties(final ValidationContext validationContext) { + List<ValidationResult> results = new ArrayList<>(); + + String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue(); + + /* + * validates that if one of SASL (Kerberos) option is selected for + * security protocol, then Kerberos principal is provided as well + */ + if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { + String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); + if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) { + results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) + .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" + + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" + + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") + .build()); + } + + String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue(); + String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue(); + if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal)) + || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) { + results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) + .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> " + + "must be set.") + .build()); + } + } + + //If SSL or SASL_SSL then CS must be set. + final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol); + final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet(); + if (csSet && !sslProtocol) { + results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false) + .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build()); + } + if (!csSet && sslProtocol) { + results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false) + .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build()); + } + + final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue(); + if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) { + results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) + .explanation("Enable auto commit must be false. It is managed by the processor.").build()); + } + + final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue(); + if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) { + results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) + .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); + } + + final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue(); + if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) { + results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) + .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); + } + + final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue(); + if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) { + results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) + .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); + } + + final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue(); + if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) { + results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) + .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); + } + + return results; + } + + static final class KafkaConfigValidator implements Validator { + + final Class<?> classType; + + public KafkaConfigValidator(final Class<?> classType) { + this.classType = classType; + } + + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class); + return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build(); + } + }; + + /** + * Builds transit URI for provenance event. The transit URI will be in the + * form of <security.protocol>://<bootstrap.servers>/topic + */ + static String buildTransitURI(String securityProtocol, String brokers, String topic) { + StringBuilder builder = new StringBuilder(); + builder.append(securityProtocol); + builder.append("://"); + builder.append(brokers); + builder.append("/"); + builder.append(topic); + return builder.toString(); + } + + + static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) { + for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) { + if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) { + // Translate SSLContext Service configuration into Kafka properties + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService != null && sslContextService.isKeyStoreConfigured()) { + mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile()); + mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword()); + final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword(); + mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass); + mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType()); + } + + if (sslContextService != null && sslContextService.isTrustStoreConfigured()) { + mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile()); + mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword()); + mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType()); + } + } + + String propertyName = propertyDescriptor.getName(); + String propertyValue = propertyDescriptor.isExpressionLanguageSupported() + ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue() + : context.getProperty(propertyDescriptor).getValue(); + + if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) { + // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds + // or the standard NiFi time period such as "5 secs" + if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation + propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS)); + } + + if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) { + mapToPopulate.put(propertyName, propertyValue); + } + } + } + + String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue(); + if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) { + setJaasConfig(mapToPopulate, context); + } + } + + /** + * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br /> + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br /> + * <br /> + * It expects something with the following format: <br /> + * <br /> + * <LoginModuleClass> <ControlFlag> *(<OptionName>=<OptionValue>); <br /> + * ControlFlag = required / requisite / sufficient / optional + * + * @param mapToPopulate Map of configuration properties + * @param context Context + */ + private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) { + String keytab = context.getProperty(USER_KEYTAB).getValue(); + String principal = context.getProperty(USER_PRINCIPAL).getValue(); + String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue(); + if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) { + mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required " + + "useTicketCache=false " + + "renewTicket=true " + + "serviceName=\"" + serviceName + "\" " + + "useKeyTab=true " + + "keyTab=\"" + keytab + "\" " + + "principal=\"" + principal + "\";"); + } + } + + private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) { + return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name); + } + + private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) { + final Set<String> strings = new HashSet<>(); + for (final Class<?> classType : classes) { + for (final Field field : classType.getDeclaredFields()) { + if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) { + try { + strings.add(String.valueOf(field.get(null))); + } catch (IllegalArgumentException | IllegalAccessException ex) { + //ignore + } + } + } + } + return strings; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java new file mode 100644 index 0000000..64ab4ce --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.util.Map; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +/** + * Collection of implementation of common Kafka {@link Partitioner}s. + */ +final public class Partitioners { + + private Partitioners() { + } + + /** + * {@link Partitioner} that implements 'round-robin' mechanism which evenly + * distributes load between all available partitions. + */ + public static class RoundRobinPartitioner implements Partitioner { + + private volatile int index; + + @Override + public void configure(Map<String, ?> configs) { + // noop + } + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + return this.next(cluster.availablePartitionsForTopic(topic).size()); + } + + @Override + public void close() { + // noop + } + + private synchronized int next(int numberOfPartitions) { + if (this.index >= numberOfPartitions) { + this.index = 0; + } + return index++; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java new file mode 100644 index 0000000..c125d62 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.FlowFileFilters; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 1.0 Producer API. " + + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " + + "The complementary NiFi processor for fetching messages is ConsumeKafkaRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + + "FlowFiles that are routed to success.") +@SeeAlso({PublishKafka_1_0.class, ConsumeKafka_1_0.class, ConsumeKafkaRecord_1_0.class}) +public class PublishKafkaRecord_1_0 extends AbstractProcessor { + protected static final String MSG_COUNT = "msg.count"; + + static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", + "FlowFile will be routed to failure unless the message is replicated to the appropriate " + + "number of Kafka Nodes according to the Topic configuration"); + static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", + "FlowFile will be routed to success if the message is received by a single Kafka node, " + + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> " + + "but can result in data loss if a Kafka node crashes"); + static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", + "FlowFile will be routed to success after successfully writing the content to a Kafka node, " + + "without waiting for a response. This provides the best performance but may result in data loss."); + + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(), + Partitioners.RoundRobinPartitioner.class.getSimpleName(), + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + + "the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", + "DefaultPartitioner", "Messages will be assigned to random partitions."); + + static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); + static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", + "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters."); + + static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("topic") + .displayName("Topic Name") + .description("The name of the Kafka Topic to publish to.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The Record Reader to use for incoming FlowFiles") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to serialize the data before sending to Kafka") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder() + .name("message-key-field") + .displayName("Message Key Field") + .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + + static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name("acks") + .displayName("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + + static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder() + .name("max.block.ms") + .displayName("Max Metadata Wait Time") + .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the " + + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("5 sec") + .build(); + + static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder() + .name("ack.wait.time") + .displayName("Acknowledgment Wait Time") + .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .defaultValue("5 secs") + .build(); + + static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("max.request.size") + .displayName("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + + static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() + .name("partitioner.class") + .displayName("Partitioner class") + .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING) + .defaultValue(RANDOM_PARTITIONING.getValue()) + .required(false) + .build(); + + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("compression.type") + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder() + .name("attribute-name-regex") + .displayName("Attributes to Send as Headers (Regex)") + .description("A Regular Expression that is matched against all FlowFile attribute names. " + + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " + + "If not specified, no FlowFile attributes will be added as headers.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(false) + .required(false) + .build(); + static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder() + .name("use-transactions") + .displayName("Use Transactions") + .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, " + + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder() + .name("message-header-encoding") + .displayName("Message Header Encoding") + .description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, " + + "this property indicates the Character Encoding to use for serializing the headers.") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .required(false) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles for which all content was sent to Kafka.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES; + private static final Set<Relationship> RELATIONSHIPS; + + private volatile PublisherPool publisherPool = null; + + static { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS); + properties.add(TOPIC); + properties.add(RECORD_READER); + properties.add(RECORD_WRITER); + properties.add(USE_TRANSACTIONS); + properties.add(DELIVERY_GUARANTEE); + properties.add(ATTRIBUTE_NAME_REGEX); + properties.add(MESSAGE_HEADER_ENCODING); + properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL); + properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE); + properties.add(KafkaProcessorUtils.USER_PRINCIPAL); + properties.add(KafkaProcessorUtils.USER_KEYTAB); + properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); + properties.add(MESSAGE_KEY_FIELD); + properties.add(MAX_REQUEST_SIZE); + properties.add(ACK_WAIT_TIME); + properties.add(METADATA_WAIT_TIME); + properties.add(PARTITION_CLASS); + properties.add(COMPRESSION_CODEC); + + PROPERTIES = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName) + .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)) + .dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext)); + + final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean(); + if (useTransactions) { + final String deliveryGuarantee = validationContext.getProperty(DELIVERY_GUARANTEE).getValue(); + if (!DELIVERY_REPLICATED.getValue().equals(deliveryGuarantee)) { + results.add(new ValidationResult.Builder() + .subject("Delivery Guarantee") + .valid(false) + .explanation("In order to use Transactions, the Delivery Guarantee must be \"Guarantee Replicated Delivery.\" " + + "Either change the <Use Transactions> property or the <Delivery Guarantee> property.") + .build()); + } + } + + return results; + } + + private synchronized PublisherPool getPublisherPool(final ProcessContext context) { + PublisherPool pool = publisherPool; + if (pool != null) { + return pool; + } + + return publisherPool = createPublisherPool(context); + } + + protected PublisherPool createPublisherPool(final ProcessContext context) { + final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue(); + final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue(); + + final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue(); + final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex); + final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean(); + + final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue(); + final Charset charset = Charset.forName(charsetName); + + final Map<String, Object> kafkaProperties = new HashMap<>(); + KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize)); + + return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, attributeNamePattern, charset); + } + + @OnStopped + public void closePool() { + if (publisherPool != null) { + publisherPool.close(); + } + + publisherPool = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 500)); + if (flowFiles.isEmpty()) { + return; + } + + final PublisherPool pool = getPublisherPool(context); + if (pool == null) { + context.yield(); + return; + } + + final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean(); + + final long startTime = System.nanoTime(); + try (final PublisherLease lease = pool.obtainPublisher()) { + if (useTransactions) { + lease.beginTransaction(); + } + + // Send each FlowFile to Kafka asynchronously. + for (final FlowFile flowFile : flowFiles) { + if (!isScheduled()) { + // If stopped, re-queue FlowFile instead of sending it + if (useTransactions) { + session.rollback(); + lease.rollback(); + return; + } + + session.transfer(flowFile); + continue; + } + + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); + + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); + final RecordSet recordSet = reader.createRecordSet(); + + final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema()); + lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic); + } catch (final SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException(e); + } + } + }); + } catch (final Exception e) { + // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles() + lease.fail(flowFile, e); + continue; + } + } + + // Complete the send + final PublishResult publishResult = lease.complete(); + + if (publishResult.isFailure()) { + getLogger().info("Failed to send FlowFile to kafka; transferring to failure"); + session.transfer(flowFiles, REL_FAILURE); + return; + } + + // Transfer any successful FlowFiles. + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + for (FlowFile success : flowFiles) { + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue(); + + final int msgCount = publishResult.getSuccessfulMessageCount(success); + success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount)); + session.adjustCounter("Messages Sent", msgCount, true); + + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic); + session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis); + session.transfer(success, REL_SUCCESS); + } + } + } +}
