http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java new file mode 100644 index 0000000..6f17bd5 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import javax.xml.bind.DatatypeConverter; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.FlowFileFilters; +import org.apache.nifi.processor.util.StandardValidators; + +@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 1.0 Producer API." + + "The messages to send may be individual FlowFiles or may be delimited, using a " + + "user-specified delimiter, such as a new-line. " + + "The complementary NiFi processor for fetching messages is ConsumeKafka_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + + "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may " + + "be greater than 1.") +public class PublishKafka_1_0 extends AbstractProcessor { + protected static final String MSG_COUNT = "msg.count"; + + static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", + "FlowFile will be routed to failure unless the message is replicated to the appropriate " + + "number of Kafka Nodes according to the Topic configuration"); + static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", + "FlowFile will be routed to success if the message is received by a single Kafka node, " + + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> " + + "but can result in data loss if a Kafka node crashes"); + static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", + "FlowFile will be routed to success after successfully writing the content to a Kafka node, " + + "without waiting for a response. This provides the best performance but may result in data loss."); + + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(), + Partitioners.RoundRobinPartitioner.class.getSimpleName(), + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + + "the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", + "DefaultPartitioner", "Messages will be assigned to random partitions."); + + static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); + static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", + "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters."); + + static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("topic") + .displayName("Topic Name") + .description("The name of the Kafka Topic to publish to.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name(ProducerConfig.ACKS_CONFIG) + .displayName("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + + static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder() + .name(ProducerConfig.MAX_BLOCK_MS_CONFIG) + .displayName("Max Metadata Wait Time") + .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the " + + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("5 sec") + .build(); + + static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder() + .name("ack.wait.time") + .displayName("Acknowledgment Wait Time") + .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .defaultValue("5 secs") + .build(); + + static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("max.request.size") + .displayName("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + + static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("kafka-key") + .displayName("Kafka Key") + .description("The Key to use for the Message. " + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present " + + "and we're not demarcating.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("key-attribute-encoding") + .displayName("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(UTF8_ENCODING.getValue()) + .allowableValues(UTF8_ENCODING, HEX_ENCODING) + .build(); + + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("message-demarcator") + .displayName("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within " + + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " + + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.") + .build(); + + static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() + .name(ProducerConfig.PARTITIONER_CLASS_CONFIG) + .displayName("Partitioner class") + .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING) + .defaultValue(RANDOM_PARTITIONING.getValue()) + .required(false) + .build(); + + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name(ProducerConfig.COMPRESSION_TYPE_CONFIG) + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder() + .name("attribute-name-regex") + .displayName("Attributes to Send as Headers (Regex)") + .description("A Regular Expression that is matched against all FlowFile attribute names. " + + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. " + + "If not specified, no FlowFile attributes will be added as headers.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(false) + .required(false) + .build(); + static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder() + .name("use-transactions") + .displayName("Use Transactions") + .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, " + + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. " + + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true " + + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder() + .name("message-header-encoding") + .displayName("Message Header Encoding") + .description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, " + + "this property indicates the Character Encoding to use for serializing the headers.") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .required(false) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles for which all content was sent to Kafka.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES; + private static final Set<Relationship> RELATIONSHIPS; + + private volatile PublisherPool publisherPool = null; + + static { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors()); + properties.add(TOPIC); + properties.add(DELIVERY_GUARANTEE); + properties.add(USE_TRANSACTIONS); + properties.add(ATTRIBUTE_NAME_REGEX); + properties.add(MESSAGE_HEADER_ENCODING); + properties.add(KEY); + properties.add(KEY_ATTRIBUTE_ENCODING); + properties.add(MESSAGE_DEMARCATOR); + properties.add(MAX_REQUEST_SIZE); + properties.add(ACK_WAIT_TIME); + properties.add(METADATA_WAIT_TIME); + properties.add(PARTITION_CLASS); + properties.add(COMPRESSION_CODEC); + + PROPERTIES = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName) + .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)) + .dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext)); + + final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean(); + if (useTransactions) { + final String deliveryGuarantee = validationContext.getProperty(DELIVERY_GUARANTEE).getValue(); + if (!DELIVERY_REPLICATED.getValue().equals(deliveryGuarantee)) { + results.add(new ValidationResult.Builder() + .subject("Delivery Guarantee") + .valid(false) + .explanation("In order to use Transactions, the Delivery Guarantee must be \"Guarantee Replicated Delivery.\" " + + "Either change the <Use Transactions> property or the <Delivery Guarantee> property.") + .build()); + } + } + + return results; + } + + private synchronized PublisherPool getPublisherPool(final ProcessContext context) { + PublisherPool pool = publisherPool; + if (pool != null) { + return pool; + } + + return publisherPool = createPublisherPool(context); + } + + protected PublisherPool createPublisherPool(final ProcessContext context) { + final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue(); + final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue(); + + final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue(); + final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex); + final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean(); + + final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue(); + final Charset charset = Charset.forName(charsetName); + + final Map<String, Object> kafkaProperties = new HashMap<>(); + KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize)); + + return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, attributeNamePattern, charset); + } + + @OnStopped + public void closePool() { + if (publisherPool != null) { + publisherPool.close(); + } + + publisherPool = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final boolean useDemarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet(); + + final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500)); + if (flowFiles.isEmpty()) { + return; + } + + final PublisherPool pool = getPublisherPool(context); + if (pool == null) { + context.yield(); + return; + } + + final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean(); + + final long startTime = System.nanoTime(); + try (final PublisherLease lease = pool.obtainPublisher()) { + if (useTransactions) { + lease.beginTransaction(); + } + + // Send each FlowFile to Kafka asynchronously. + for (final FlowFile flowFile : flowFiles) { + if (!isScheduled()) { + // If stopped, re-queue FlowFile instead of sending it + if (useTransactions) { + session.rollback(); + lease.rollback(); + return; + } + + session.transfer(flowFile); + continue; + } + + final byte[] messageKey = getMessageKey(flowFile, context); + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + final byte[] demarcatorBytes; + if (useDemarcator) { + demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8); + } else { + demarcatorBytes = null; + } + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + lease.publish(flowFile, in, messageKey, demarcatorBytes, topic); + } + } + }); + } + + // Complete the send + final PublishResult publishResult = lease.complete(); + + if (publishResult.isFailure()) { + getLogger().info("Failed to send FlowFile to kafka; transferring to failure"); + session.transfer(flowFiles, REL_FAILURE); + return; + } + + // Transfer any successful FlowFiles. + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + for (FlowFile success : flowFiles) { + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue(); + + final int msgCount = publishResult.getSuccessfulMessageCount(success); + success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount)); + session.adjustCounter("Messages Sent", msgCount, true); + + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic); + session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis); + session.transfer(success, REL_SUCCESS); + } + } + } + + + private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) { + if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { + return null; + } + + final String uninterpretedKey; + if (context.getProperty(KEY).isSet()) { + uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + } else { + uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY); + } + + if (uninterpretedKey == null) { + return null; + } + + final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); + if (UTF8_ENCODING.getValue().equals(keyEncoding)) { + return uninterpretedKey.getBytes(StandardCharsets.UTF_8); + } + + return DatatypeConverter.parseHexBinary(uninterpretedKey); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java new file mode 100644 index 0000000..1f7c3ab --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import org.apache.nifi.flowfile.FlowFile; + +public interface PublishResult { + + boolean isFailure(); + + int getSuccessfulMessageCount(FlowFile flowFile); + + Exception getReasonForFailure(FlowFile flowFile); + + public static PublishResult EMPTY = new PublishResult() { + @Override + public boolean isFailure() { + return false; + } + + @Override + public int getSuccessfulMessageCount(FlowFile flowFile) { + return 0; + } + + @Override + public Exception getReasonForFailure(FlowFile flowFile) { + return null; + } + }; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java new file mode 100644 index 0000000..abcd15f --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Headers; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.exception.TokenTooLargeException; +import org.apache.nifi.stream.io.util.StreamDemarcator; + +public class PublisherLease implements Closeable { + private final ComponentLog logger; + private final Producer<byte[], byte[]> producer; + private final int maxMessageSize; + private final long maxAckWaitMillis; + private final boolean useTransactions; + private final Pattern attributeNameRegex; + private final Charset headerCharacterSet; + private volatile boolean poisoned = false; + private final AtomicLong messagesSent = new AtomicLong(0L); + + private volatile boolean transactionsInitialized = false; + private volatile boolean activeTransaction = false; + + private InFlightMessageTracker tracker; + + public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger, + final boolean useTransactions, final Pattern attributeNameRegex, final Charset headerCharacterSet) { + this.producer = producer; + this.maxMessageSize = maxMessageSize; + this.logger = logger; + this.maxAckWaitMillis = maxAckWaitMillis; + this.useTransactions = useTransactions; + this.attributeNameRegex = attributeNameRegex; + this.headerCharacterSet = headerCharacterSet; + } + + protected void poison() { + this.poisoned = true; + } + + public boolean isPoisoned() { + return poisoned; + } + + void beginTransaction() { + if (!useTransactions) { + return; + } + + if (!transactionsInitialized) { + producer.initTransactions(); + transactionsInitialized = true; + } + + producer.beginTransaction(); + activeTransaction = true; + } + + void rollback() { + if (!useTransactions || !activeTransaction) { + return; + } + + producer.abortTransaction(); + activeTransaction = false; + } + + void fail(final FlowFile flowFile, final Exception cause) { + getTracker().fail(flowFile, cause); + rollback(); + } + + void publish(final FlowFile flowFile, final InputStream flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final String topic) throws IOException { + if (tracker == null) { + tracker = new InFlightMessageTracker(logger); + } + + try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) { + byte[] messageContent; + try { + while ((messageContent = demarcator.nextToken()) != null) { + // We do not want to use any key if we have a demarcator because that would result in + // the key being the same for multiple messages + final byte[] keyToUse = demarcatorBytes == null ? messageKey : null; + publish(flowFile, keyToUse, messageContent, topic, tracker); + + if (tracker.isFailed(flowFile)) { + // If we have a failure, don't try to send anything else. + return; + } + } + } catch (final TokenTooLargeException ttle) { + tracker.fail(flowFile, ttle); + } + } catch (final Exception e) { + tracker.fail(flowFile, e); + poison(); + throw e; + } + } + + void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSetWriterFactory writerFactory, final RecordSchema schema, + final String messageKeyField, final String topic) throws IOException { + if (tracker == null) { + tracker = new InFlightMessageTracker(logger); + } + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + + Record record; + int recordCount = 0; + + try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { + while ((record = recordSet.next()) != null) { + recordCount++; + baos.reset(); + + writer.write(record); + writer.flush(); + + final byte[] messageContent = baos.toByteArray(); + final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); + final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); + + publish(flowFile, messageKey, messageContent, topic, tracker); + + if (tracker.isFailed(flowFile)) { + // If we have a failure, don't try to send anything else. + return; + } + } + + if (recordCount == 0) { + tracker.trackEmpty(flowFile); + } + } catch (final TokenTooLargeException ttle) { + tracker.fail(flowFile, ttle); + } catch (final SchemaNotFoundException snfe) { + throw new IOException(snfe); + } catch (final Exception e) { + tracker.fail(flowFile, e); + poison(); + throw e; + } + } + + private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) { + if (attributeNameRegex == null) { + return; + } + + final Headers headers = record.headers(); + for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) { + if (attributeNameRegex.matcher(entry.getKey()).matches()) { + headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet)); + } + } + } + + protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) { + final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent); + addHeaders(flowFile, record); + + producer.send(record, new Callback() { + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception == null) { + tracker.incrementAcknowledgedCount(flowFile); + } else { + tracker.fail(flowFile, exception); + poison(); + } + } + }); + + messagesSent.incrementAndGet(); + tracker.incrementSentCount(flowFile); + } + + + public PublishResult complete() { + if (tracker == null) { + if (messagesSent.get() == 0L) { + return PublishResult.EMPTY; + } + + rollback(); + throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed"); + } + + producer.flush(); + + if (activeTransaction) { + producer.commitTransaction(); + activeTransaction = false; + } + + try { + tracker.awaitCompletion(maxAckWaitMillis); + return tracker.createPublishResult(); + } catch (final InterruptedException e) { + logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka"); + Thread.currentThread().interrupt(); + return tracker.failOutstanding(e); + } catch (final TimeoutException e) { + logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka"); + return tracker.failOutstanding(e); + } finally { + tracker = null; + } + } + + @Override + public void close() { + producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS); + tracker = null; + } + + public InFlightMessageTracker getTracker() { + if (tracker == null) { + tracker = new InFlightMessageTracker(logger); + } + + return tracker; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java new file mode 100644 index 0000000..d5caa8d --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.nifi.logging.ComponentLog; + +public class PublisherPool implements Closeable { + private final ComponentLog logger; + private final BlockingQueue<PublisherLease> publisherQueue; + private final Map<String, Object> kafkaProperties; + private final int maxMessageSize; + private final long maxAckWaitMillis; + private final boolean useTransactions; + private final Pattern attributeNameRegex; + private final Charset headerCharacterSet; + + private volatile boolean closed = false; + + PublisherPool(final Map<String, Object> kafkaProperties, final ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis, + final boolean useTransactions, final Pattern attributeNameRegex, final Charset headerCharacterSet) { + this.logger = logger; + this.publisherQueue = new LinkedBlockingQueue<>(); + this.kafkaProperties = kafkaProperties; + this.maxMessageSize = maxMessageSize; + this.maxAckWaitMillis = maxAckWaitMillis; + this.useTransactions = useTransactions; + this.attributeNameRegex = attributeNameRegex; + this.headerCharacterSet = headerCharacterSet; + } + + public PublisherLease obtainPublisher() { + if (isClosed()) { + throw new IllegalStateException("Connection Pool is closed"); + } + + PublisherLease lease = publisherQueue.poll(); + if (lease != null) { + return lease; + } + + lease = createLease(); + return lease; + } + + private PublisherLease createLease() { + final Map<String, Object> properties = new HashMap<>(kafkaProperties); + if (useTransactions) { + properties.put("transactional.id", UUID.randomUUID().toString()); + } + + final Producer<byte[], byte[]> producer = new KafkaProducer<>(properties); + + final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger, useTransactions, attributeNameRegex, headerCharacterSet) { + @Override + public void close() { + if (isPoisoned() || isClosed()) { + super.close(); + } else { + publisherQueue.offer(this); + } + } + }; + + return lease; + } + + public synchronized boolean isClosed() { + return closed; + } + + @Override + public synchronized void close() { + closed = true; + + PublisherLease lease; + while ((lease = publisherQueue.poll()) != null) { + lease.close(); + } + } + + /** + * Returns the number of leases that are currently available + * + * @return the number of leases currently available + */ + protected int available() { + return publisherQueue.size(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..ea9d84d --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0 +org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0 +org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0 +org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0/additionalDetails.html new file mode 100644 index 0000000..1fd6449 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0/additionalDetails.html @@ -0,0 +1,143 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>ConsumeKafka</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <h2>Description</h2> + <p> + This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a> + for data using KafkaConsumer API available with Kafka 1.0. When a message is received + from Kafka, the message will be deserialized using the configured Record Reader, and then + written to a FlowFile by serializing the message with the configured Record Writer. + </p> + + + <h2>Security Configuration:</h2> + <p> + The Security Protocol property allows the user to specify the protocol for communicating + with the Kafka broker. The following sections describe each of the protocols in further detail. + </p> + <h3>PLAINTEXT</h3> + <p> + This option provides an unsecured connection to the broker, with no client authentication and no encryption. + In order to use this option the broker must be configured with a listener of the form: + <pre> + PLAINTEXT://host.name:port + </pre> + </p> + <h3>SSL</h3> + <p> + This option provides an encrypted connection to the broker, with optional client authentication. In order + to use this option the broker must be configured with a listener of the form: + <pre> + SSL://host.name:port + </pre> + In addition, the processor must have an SSL Context Service selected. + </p> + <p> + If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will + not be required to present a certificate. In this case, the SSL Context Service selected may specify only + a truststore containing the public key of the certificate authority used to sign the broker's key. + </p> + <p> + If the broker specifies ssl.client.auth=required then the client will be required to present a certificate. + In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to + a truststore as described above. + </p> + <h3>SASL_PLAINTEXT</h3> + <p> + This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this + option the broker must be configured with a listener of the form: + <pre> + SASL_PLAINTEXT://host.name:port + </pre> + In addition, the Kerberos Service Name must be specified in the processor. + </p> + <h4>SASL_PLAINTEXT - GSSAPI</h4> + <p> + If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The + JAAS configuration can be provided by specifying the java.security.auth.login.config system property in + NiFi's bootstrap.conf, such as: + <pre> + java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf + </pre> + </p> + <p> + An example of the JAAS config file would be the following: + <pre> + KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/nifi.keytab" + serviceName="kafka" + principal="[email protected]"; + }; + </pre> + <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor. + </p> + <p> + Alternatively, the JAAS + configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab + directly in the processor properties. This will dynamically create a JAAS configuration like above, and + will take precedence over the java.security.auth.login.config system property. + </p> + <h4>SASL_PLAINTEXT - PLAIN</h4> + <p> + If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but + the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would + be the following: + <pre> + KafkaClient { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="nifi" + password="nifi-password"; + }; + </pre> + </p> + <p> + <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit + the username and password unencrypted. + </p> + <p> + <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making + it visible to components in other NARs that may access the providers. There is currently a known issue + where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work. + </p> + <h3>SASL_SSL</h3> + <p> + This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this + option the broker must be configured with a listener of the form: + <pre> + SASL_SSL://host.name:port + </pre> + </p> + <p> + See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration + depending on the SASL mechanism (GSSAPI or PLAIN). + </p> + <p> + See the SSL section for a description of how to configure the SSL Context Service based on the + ssl.client.auth property. + </p> + + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0/additionalDetails.html new file mode 100644 index 0000000..f206b0b --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_1_0/additionalDetails.html @@ -0,0 +1,143 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>ConsumeKafka</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <h2>Description</h2> + <p> + This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a> + for data using KafkaConsumer API available with Kafka 1.0. When a message is received + from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value + of the Kafka message. + </p> + + + <h2>Security Configuration</h2> + <p> + The Security Protocol property allows the user to specify the protocol for communicating + with the Kafka broker. The following sections describe each of the protocols in further detail. + </p> + <h3>PLAINTEXT</h3> + <p> + This option provides an unsecured connection to the broker, with no client authentication and no encryption. + In order to use this option the broker must be configured with a listener of the form: + <pre> + PLAINTEXT://host.name:port + </pre> + </p> + <h3>SSL</h3> + <p> + This option provides an encrypted connection to the broker, with optional client authentication. In order + to use this option the broker must be configured with a listener of the form: + <pre> + SSL://host.name:port + </pre> + In addition, the processor must have an SSL Context Service selected. + </p> + <p> + If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will + not be required to present a certificate. In this case, the SSL Context Service selected may specify only + a truststore containing the public key of the certificate authority used to sign the broker's key. + </p> + <p> + If the broker specifies ssl.client.auth=required then the client will be required to present a certificate. + In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to + a truststore as described above. + </p> + <h3>SASL_PLAINTEXT</h3> + <p> + This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this + option the broker must be configured with a listener of the form: + <pre> + SASL_PLAINTEXT://host.name:port + </pre> + In addition, the Kerberos Service Name must be specified in the processor. + </p> + <h4>SASL_PLAINTEXT - GSSAPI</h4> + <p> + If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The + JAAS configuration can be provided by specifying the java.security.auth.login.config system property in + NiFi's bootstrap.conf, such as: + <pre> + java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf + </pre> + </p> + <p> + An example of the JAAS config file would be the following: + <pre> + KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/nifi.keytab" + serviceName="kafka" + principal="[email protected]"; + }; + </pre> + <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor. + </p> + <p> + Alternatively, the JAAS + configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab + directly in the processor properties. This will dynamically create a JAAS configuration like above, and + will take precedence over the java.security.auth.login.config system property. + </p> + <h4>SASL_PLAINTEXT - PLAIN</h4> + <p> + If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but + the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would + be the following: + <pre> + KafkaClient { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="nifi" + password="nifi-password"; + }; + </pre> + </p> + <p> + <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit + the username and password unencrypted. + </p> + <p> + <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making + it visible to components in other NARs that may access the providers. There is currently a known issue + where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work. + </p> + <h3>SASL_SSL</h3> + <p> + This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this + option the broker must be configured with a listener of the form: + <pre> + SASL_SSL://host.name:port + </pre> + </p> + <p> + See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration + depending on the SASL mechanism (GSSAPI or PLAIN). + </p> + <p> + See the SSL section for a description of how to configure the SSL Context Service based on the + ssl.client.auth property. + </p> + + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0/additionalDetails.html new file mode 100644 index 0000000..54b7786 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_1_0/additionalDetails.html @@ -0,0 +1,144 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>PublishKafka</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <h2>Description</h2> + <p> + This Processor puts the contents of a FlowFile to a Topic in + <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available + with Kafka 1.0 API. The contents of the incoming FlowFile will be read using the + configured Record Reader. Each record will then be serialized using the configured + Record Writer, and this serialized form will be the content of a Kafka message. + This message is optionally assigned a key by using the <Kafka Key> Property. + </p> + + + <h2>Security Configuration</h2> + <p> + The Security Protocol property allows the user to specify the protocol for communicating + with the Kafka broker. The following sections describe each of the protocols in further detail. + </p> + <h3>PLAINTEXT</h3> + <p> + This option provides an unsecured connection to the broker, with no client authentication and no encryption. + In order to use this option the broker must be configured with a listener of the form: + <pre> + PLAINTEXT://host.name:port + </pre> + </p> + <h3>SSL</h3> + <p> + This option provides an encrypted connection to the broker, with optional client authentication. In order + to use this option the broker must be configured with a listener of the form: + <pre> + SSL://host.name:port + </pre> + In addition, the processor must have an SSL Context Service selected. + </p> + <p> + If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will + not be required to present a certificate. In this case, the SSL Context Service selected may specify only + a truststore containing the public key of the certificate authority used to sign the broker's key. + </p> + <p> + If the broker specifies ssl.client.auth=required then the client will be required to present a certificate. + In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to + a truststore as described above. + </p> + <h3>SASL_PLAINTEXT</h3> + <p> + This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this + option the broker must be configured with a listener of the form: + <pre> + SASL_PLAINTEXT://host.name:port + </pre> + In addition, the Kerberos Service Name must be specified in the processor. + </p> + <h4>SASL_PLAINTEXT - GSSAPI</h4> + <p> + If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The + JAAS configuration can be provided by specifying the java.security.auth.login.config system property in + NiFi's bootstrap.conf, such as: + <pre> + java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf + </pre> + </p> + <p> + An example of the JAAS config file would be the following: + <pre> + KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/nifi.keytab" + serviceName="kafka" + principal="[email protected]"; + }; + </pre> + <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor. + </p> + <p> + Alternatively, the JAAS + configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab + directly in the processor properties. This will dynamically create a JAAS configuration like above, and + will take precedence over the java.security.auth.login.config system property. + </p> + <h4>SASL_PLAINTEXT - PLAIN</h4> + <p> + If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but + the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would + be the following: + <pre> + KafkaClient { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="nifi" + password="nifi-password"; + }; + </pre> + </p> + <p> + <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit + the username and password unencrypted. + </p> + <p> + <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making + it visible to components in other NARs that may access the providers. There is currently a known issue + where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work. + </p> + <h3>SASL_SSL</h3> + <p> + This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this + option the broker must be configured with a listener of the form: + <pre> + SASL_SSL://host.name:port + </pre> + </p> + <p> + See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration + depending on the SASL mechanism (GSSAPI or PLAIN). + </p> + <p> + See the SSL section for a description of how to configure the SSL Context Service based on the + ssl.client.auth property. + </p> + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0/additionalDetails.html new file mode 100644 index 0000000..7d68fe0 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_1_0/additionalDetails.html @@ -0,0 +1,156 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>PublishKafka</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <h2>Description</h2> + <p> + This Processor puts the contents of a FlowFile to a Topic in + <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available + with Kafka 1.0 API. The content of a FlowFile becomes the contents of a Kafka message. + This message is optionally assigned a key by using the <Kafka Key> Property. + </p> + + <p> + The Processor allows the user to configure an optional Message Demarcator that + can be used to send many messages per FlowFile. For example, a <i>\n</i> could be used + to indicate that the contents of the FlowFile should be used to send one message + per line of text. It also supports multi-char demarcators (e.g., 'my custom demarcator'). + If the property is not set, the entire contents of the FlowFile + will be sent as a single message. When using the demarcator, if some messages are + successfully sent but other messages fail to send, the resulting FlowFile will be + considered a failed FlowFile and will have additional attributes to that effect. + One of such attributes is 'failed.last.idx' which indicates the index of the last message + that was successfully ACKed by Kafka. (if no demarcator is used the value of this index will be -1). + This will allow PublishKafka to only re-send un-ACKed messages on the next re-try. + </p> + + + <h2>Security Configuration</h2> + <p> + The Security Protocol property allows the user to specify the protocol for communicating + with the Kafka broker. The following sections describe each of the protocols in further detail. + </p> + <h3>PLAINTEXT</h3> + <p> + This option provides an unsecured connection to the broker, with no client authentication and no encryption. + In order to use this option the broker must be configured with a listener of the form: + <pre> + PLAINTEXT://host.name:port + </pre> + </p> + <h3>SSL</h3> + <p> + This option provides an encrypted connection to the broker, with optional client authentication. In order + to use this option the broker must be configured with a listener of the form: + <pre> + SSL://host.name:port + </pre> + In addition, the processor must have an SSL Context Service selected. + </p> + <p> + If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will + not be required to present a certificate. In this case, the SSL Context Service selected may specify only + a truststore containing the public key of the certificate authority used to sign the broker's key. + </p> + <p> + If the broker specifies ssl.client.auth=required then the client will be required to present a certificate. + In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to + a truststore as described above. + </p> + <h3>SASL_PLAINTEXT</h3> + <p> + This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this + option the broker must be configured with a listener of the form: + <pre> + SASL_PLAINTEXT://host.name:port + </pre> + In addition, the Kerberos Service Name must be specified in the processor. + </p> + <h4>SASL_PLAINTEXT - GSSAPI</h4> + <p> + If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The + JAAS configuration can be provided by specifying the java.security.auth.login.config system property in + NiFi's bootstrap.conf, such as: + <pre> + java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf + </pre> + </p> + <p> + An example of the JAAS config file would be the following: + <pre> + KafkaClient { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/path/to/nifi.keytab" + serviceName="kafka" + principal="[email protected]"; + }; + </pre> + <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor. + </p> + <p> + Alternatively, the JAAS + configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab + directly in the processor properties. This will dynamically create a JAAS configuration like above, and + will take precedence over the java.security.auth.login.config system property. + </p> + <h4>SASL_PLAINTEXT - PLAIN</h4> + <p> + If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but + the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would + be the following: + <pre> + KafkaClient { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="nifi" + password="nifi-password"; + }; + </pre> + </p> + <p> + <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit + the username and password unencrypted. + </p> + <p> + <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making + it visible to components in other NARs that may access the providers. There is currently a known issue + where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work. + </p> + <h3>SASL_SSL</h3> + <p> + This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this + option the broker must be configured with a listener of the form: + <pre> + SASL_SSL://host.name:port + </pre> + </p> + <p> + See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration + depending on the SASL mechanism (GSSAPI or PLAIN). + </p> + <p> + See the SSL section for a description of how to configure the SSL Context Service based on the + ssl.client.auth property. + </p> + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java new file mode 100644 index 0000000..7b5a8fc --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class ConsumeKafkaTest { + + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + + @Before + public void setup() { + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + } + + @Test + public void validateCustomValidatorSettings() throws Exception { + ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0(); + TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.assertValid(); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo"); + runner.assertNotValid(); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.assertValid(); + runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + runner.assertValid(); + runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + runner.assertNotValid(); + } + + @Test + public void validatePropertiesValidation() throws Exception { + ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0(); + TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + + runner.removeProperty(ConsumeKafka_1_0.GROUP_ID); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("invalid because Group ID is required")); + } + + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafka_1_0.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void testJaasConfiguration() throws Exception { + ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0(); + TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234"); + runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo"); + runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); + + runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka"); + runner.assertValid(); + + runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "[email protected]"); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File"); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); + runner.assertValid(); + } + +}
