http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java index b375b34..3bf01eb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java @@ -16,21 +16,24 @@ */ package org.apache.nifi.processors.kafka.pubsub; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.nifi.logging.ComponentLog; - import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.regex.Pattern; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; /** * A pool of Kafka Consumers for a given topic. Consumers can be obtained by @@ -49,6 +52,8 @@ public class ConsumerPool implements Closeable { private final String keyEncoding; private final String securityProtocol; private final String bootstrapServers; + private final RecordReaderFactory readerFactory; + private final RecordSetWriterFactory writerFactory; private final AtomicLong consumerCreatedCountRef = new AtomicLong(); private final AtomicLong consumerClosedCountRef = new AtomicLong(); private final AtomicLong leasesObtainedCountRef = new AtomicLong(); @@ -93,6 +98,8 @@ public class ConsumerPool implements Closeable { this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); this.topics = Collections.unmodifiableList(topics); this.topicPattern = null; + this.readerFactory = null; + this.writerFactory = null; } public ConsumerPool( @@ -115,6 +122,56 @@ public class ConsumerPool implements Closeable { this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); this.topics = null; this.topicPattern = topics; + this.readerFactory = null; + this.writerFactory = null; + } + + public ConsumerPool( + final int maxConcurrentLeases, + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, + final Map<String, Object> kafkaProperties, + final Pattern topics, + final long maxWaitMillis, + final String securityProtocol, + final String bootstrapServers, + final ComponentLog logger) { + this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.maxWaitMillis = maxWaitMillis; + this.logger = logger; + this.demarcatorBytes = null; + this.keyEncoding = null; + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.securityProtocol = securityProtocol; + this.bootstrapServers = bootstrapServers; + this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); + this.topics = null; + this.topicPattern = topics; + } + + public ConsumerPool( + final int maxConcurrentLeases, + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, + final Map<String, Object> kafkaProperties, + final List<String> topics, + final long maxWaitMillis, + final String securityProtocol, + final String bootstrapServers, + final ComponentLog logger) { + this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.maxWaitMillis = maxWaitMillis; + this.logger = logger; + this.demarcatorBytes = null; + this.keyEncoding = null; + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.securityProtocol = securityProtocol; + this.bootstrapServers = bootstrapServers; + this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); + this.topics = topics; + this.topicPattern = null; } /** @@ -122,10 +179,12 @@ public class ConsumerPool implements Closeable { * initializes a new one if deemed necessary. * * @param session the session for which the consumer lease will be - * associated + * associated + * @param processContext the ProcessContext for which the consumer + * lease will be associated * @return consumer to use or null if not available or necessary */ - public ConsumerLease obtainConsumer(final ProcessSession session) { + public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) { SimpleConsumerLease lease = pooledLeases.poll(); if (lease == null) { final Consumer<byte[], byte[]> consumer = createKafkaConsumer(); @@ -150,7 +209,8 @@ public class ConsumerPool implements Closeable { consumer.subscribe(topicPattern, lease); } } - lease.setProcessSession(session); + lease.setProcessSession(session, processContext); + leasesObtainedCountRef.incrementAndGet(); return lease; } @@ -200,15 +260,24 @@ public class ConsumerPool implements Closeable { private final Consumer<byte[], byte[]> consumer; private volatile ProcessSession session; + private volatile ProcessContext processContext; private volatile boolean closedConsumer; private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) { - super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger); + super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, readerFactory, writerFactory, logger); this.consumer = consumer; } - void setProcessSession(final ProcessSession session) { + void setProcessSession(final ProcessSession session, final ProcessContext context) { this.session = session; + this.processContext = context; + } + + @Override + public void yield() { + if (processContext != null) { + processContext.yield(); + } } @Override @@ -229,7 +298,7 @@ public class ConsumerPool implements Closeable { super.close(); if (session != null) { session.rollback(); - setProcessSession(null); + setProcessSession(null, null); } if (forceClose || isPoisoned() || !pooledLeases.offer(this)) { closedConsumer = true;
http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java new file mode 100644 index 0000000..f96a575 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.FlowFileFilters; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RecordWriter; + +@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. " + + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " + + " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring" + + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the meantime" + + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on. The complementary NiFi " + + "processor for fetching messages is ConsumeKafka_0_10_Record.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + + "FlowFiles that are routed to success.") +@SeeAlso({PublishKafka_0_10.class, ConsumeKafka_0_10.class, ConsumeKafkaRecord_0_10.class}) +public class PublishKafkaRecord_0_10 extends AbstractProcessor { + protected static final String MSG_COUNT = "msg.count"; + + static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", + "FlowFile will be routed to failure unless the message is replicated to the appropriate " + + "number of Kafka Nodes according to the Topic configuration"); + static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", + "FlowFile will be routed to success if the message is received by a single Kafka node, " + + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> " + + "but can result in data loss if a Kafka node crashes"); + static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", + "FlowFile will be routed to success after successfully writing the content to a Kafka node, " + + "without waiting for a response. This provides the best performance but may result in data loss."); + + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(), + Partitioners.RoundRobinPartitioner.class.getSimpleName(), + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + + "the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", + "DefaultPartitioner", "Messages will be assigned to random partitions."); + + static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); + static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", + "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters."); + + static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("topic") + .displayName("Topic Name") + .description("The name of the Kafka Topic to publish to.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The Record Reader to use for incoming FlowFiles") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to serialize the data before sending to Kafka") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder() + .name("message-key-field") + .displayName("Message Key Field") + .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + + static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name("acks") + .displayName("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + + static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder() + .name("max.block.ms") + .displayName("Max Metadata Wait Time") + .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the " + + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("5 sec") + .build(); + + static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder() + .name("ack.wait.time") + .displayName("Acknowledgment Wait Time") + .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. " + + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .defaultValue("5 secs") + .build(); + + static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder() + .name("max.request.size") + .displayName("Max Request Size") + .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .build(); + + static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() + .name("partitioner.class") + .displayName("Partitioner class") + .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING) + .defaultValue(RANDOM_PARTITIONING.getValue()) + .required(false) + .build(); + + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("compression.type") + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles for which all content was sent to Kafka.") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES; + private static final Set<Relationship> RELATIONSHIPS; + + private volatile PublisherPool publisherPool = null; + + static { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS); + properties.add(TOPIC); + properties.add(RECORD_READER); + properties.add(RECORD_WRITER); + properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL); + properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE); + properties.add(KafkaProcessorUtils.USER_PRINCIPAL); + properties.add(KafkaProcessorUtils.USER_KEYTAB); + properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); + properties.add(DELIVERY_GUARANTEE); + properties.add(MESSAGE_KEY_FIELD); + properties.add(MAX_REQUEST_SIZE); + properties.add(ACK_WAIT_TIME); + properties.add(METADATA_WAIT_TIME); + properties.add(PARTITION_CLASS); + properties.add(COMPRESSION_CODEC); + + PROPERTIES = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName) + .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)) + .dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + return KafkaProcessorUtils.validateCommonProperties(validationContext); + } + + private synchronized PublisherPool getPublisherPool(final ProcessContext context) { + PublisherPool pool = publisherPool; + if (pool != null) { + return pool; + } + + return publisherPool = createPublisherPool(context); + } + + protected PublisherPool createPublisherPool(final ProcessContext context) { + final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue(); + final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue(); + + final Map<String, Object> kafkaProperties = new HashMap<>(); + KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize)); + + return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis); + } + + @OnStopped + public void closePool() { + if (publisherPool != null) { + publisherPool.close(); + } + + publisherPool = null; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 500)); + if (flowFiles.isEmpty()) { + return; + } + + final PublisherPool pool = getPublisherPool(context); + if (pool == null) { + context.yield(); + return; + } + + final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + + final long startTime = System.nanoTime(); + try (final PublisherLease lease = pool.obtainPublisher()) { + // Send each FlowFile to Kafka asynchronously. + for (final FlowFile flowFile : flowFiles) { + if (!isScheduled()) { + // If stopped, re-queue FlowFile instead of sending it + session.transfer(flowFile); + continue; + } + + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); + + final RecordWriter writer; + try (final InputStream in = new BufferedInputStream(session.read(flowFile))) { + writer = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class).createWriter(getLogger(), flowFile, in); + } catch (final Exception e) { + getLogger().error("Failed to create a Record Writer for {}; routing to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + continue; + } + + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger()); + lease.publish(flowFile, reader, writer, messageKeyField, topic); + } catch (final SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException(e); + } + } + }); + } catch (final Exception e) { + // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles() + lease.getTracker().fail(flowFile, e); + continue; + } + } + + // Complete the send + final PublishResult publishResult = lease.complete(); + + // Transfer any successful FlowFiles. + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + for (FlowFile success : publishResult.getSuccessfulFlowFiles()) { + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue(); + + final int msgCount = publishResult.getSuccessfulMessageCount(success); + success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount)); + session.adjustCounter("Messages Sent", msgCount, true); + + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic); + session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis); + session.transfer(success, REL_SUCCESS); + } + + // Transfer any failures. + for (final FlowFile failure : publishResult.getFailedFlowFiles()) { + final int successCount = publishResult.getSuccessfulMessageCount(failure); + if (successCount > 0) { + getLogger().error("Failed to send some messages for {} to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to {}", + new Object[] {failure, successCount, publishResult.getReasonForFailure(failure)}); + } else { + getLogger().error("Failed to send all message for {} to Kafka; routing to failure due to {}", + new Object[] {failure, publishResult.getReasonForFailure(failure)}); + } + + session.transfer(failure, REL_FAILURE); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index b67e8a8..f08f7a9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -17,11 +17,14 @@ package org.apache.nifi.processors.kafka.pubsub; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; @@ -29,6 +32,10 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.stream.io.exception.TokenTooLargeException; import org.apache.nifi.stream.io.util.StreamDemarcator; @@ -38,6 +45,7 @@ public class PublisherLease implements Closeable { private final int maxMessageSize; private final long maxAckWaitMillis; private volatile boolean poisoned = false; + private final AtomicLong messagesSent = new AtomicLong(0L); private InFlightMessageTracker tracker; @@ -85,7 +93,42 @@ public class PublisherLease implements Closeable { } } - private void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) { + void publish(final FlowFile flowFile, final RecordReader reader, final RecordWriter writer, final String messageKeyField, final String topic) throws IOException { + if (tracker == null) { + tracker = new InFlightMessageTracker(); + } + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + + Record record; + final RecordSet recordSet = reader.createRecordSet(); + + try { + while ((record = recordSet.next()) != null) { + baos.reset(); + writer.write(record, baos); + + final byte[] messageContent = baos.toByteArray(); + final String key = messageKeyField == null ? null : record.getAsString(messageKeyField); + final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); + + publish(flowFile, messageKey, messageContent, topic, tracker); + + if (tracker.isFailed(flowFile)) { + // If we have a failure, don't try to send anything else. + return; + } + } + } catch (final TokenTooLargeException ttle) { + tracker.fail(flowFile, ttle); + } catch (final Exception e) { + tracker.fail(flowFile, e); + poison(); + throw e; + } + } + + protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) { final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent); producer.send(record, new Callback() { @Override @@ -99,11 +142,17 @@ public class PublisherLease implements Closeable { } }); + messagesSent.incrementAndGet(); tracker.incrementSentCount(flowFile); } + public PublishResult complete() { if (tracker == null) { + if (messagesSent.get() == 0L) { + return PublishResult.EMPTY; + } + throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed"); } @@ -129,4 +178,8 @@ public class PublisherLease implements Closeable { producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS); tracker = null; } + + public InFlightMessageTracker getTracker() { + return tracker; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index aa1d4e2..6da2282 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,4 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_10 -org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10 \ No newline at end of file +org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_0_10 +org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10 +org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 4a5c4fb..cc524dd 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -16,18 +16,8 @@ */ package org.apache.nifi.processors.kafka.pubsub; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; - -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.junit.Before; import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -35,15 +25,22 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + public class ConsumeKafkaTest { - Consumer<byte[], byte[]> mockConsumer = null; ConsumerLease mockLease = null; ConsumerPool mockConsumerPool = null; @Before public void setup() { - mockConsumer = mock(Consumer.class); mockLease = mock(ConsumerLease.class); mockConsumerPool = mock(ConsumerPool.class); } @@ -106,7 +103,7 @@ public class ConsumeKafkaTest { public void validateGetAllMessages() throws Exception { String groupName = "validateGetAllMessages"; - when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); when(mockLease.commit()).thenReturn(Boolean.TRUE); @@ -124,7 +121,7 @@ public class ConsumeKafkaTest { runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); runner.run(1, false); - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); verify(mockLease, times(3)).continuePolling(); verify(mockLease, times(2)).poll(); verify(mockLease, times(1)).commit(); @@ -137,7 +134,7 @@ public class ConsumeKafkaTest { public void validateGetAllMessagesPattern() throws Exception { String groupName = "validateGetAllMessagesPattern"; - when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); when(mockLease.commit()).thenReturn(Boolean.TRUE); @@ -156,7 +153,7 @@ public class ConsumeKafkaTest { runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); runner.run(1, false); - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); verify(mockLease, times(3)).continuePolling(); verify(mockLease, times(2)).poll(); verify(mockLease, times(1)).commit(); @@ -169,7 +166,7 @@ public class ConsumeKafkaTest { public void validateGetErrorMessages() throws Exception { String groupName = "validateGetErrorMessages"; - when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); when(mockLease.continuePolling()).thenReturn(true, false); when(mockLease.commit()).thenReturn(Boolean.FALSE); @@ -187,7 +184,7 @@ public class ConsumeKafkaTest { runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); runner.run(1, false); - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); verify(mockLease, times(2)).continuePolling(); verify(mockLease, times(1)).poll(); verify(mockLease, times(1)).commit(); http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java index 0ebf2b3..12a137e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.provenance.ProvenanceReporter; import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats; @@ -36,6 +37,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; + import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -44,14 +47,16 @@ import static org.mockito.Mockito.when; public class ConsumerPoolTest { - Consumer<byte[], byte[]> consumer = null; - ProcessSession mockSession = null; - ProvenanceReporter mockReporter = null; - ConsumerPool testPool = null; - ConsumerPool testDemarcatedPool = null; - ComponentLog logger = null; + private Consumer<byte[], byte[]> consumer = null; + private ProcessSession mockSession = null; + private ProcessContext mockContext = Mockito.mock(ProcessContext.class); + private ProvenanceReporter mockReporter = null; + private ConsumerPool testPool = null; + private ConsumerPool testDemarcatedPool = null; + private ComponentLog logger = null; @Before + @SuppressWarnings("unchecked") public void setup() { consumer = mock(Consumer.class); logger = mock(ComponentLog.class); @@ -94,16 +99,16 @@ public class ConsumerPoolTest { public void validatePoolSimpleCreateClose() throws Exception { when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); - try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); } - try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); } - try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); } - try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); } testPool.close(); @@ -125,7 +130,7 @@ public class ConsumerPoolTest { final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); - try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); lease.commit(); } @@ -142,7 +147,7 @@ public class ConsumerPoolTest { public void validatePoolSimpleBatchCreateClose() throws Exception { when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{})); for (int i = 0; i < 100; i++) { - try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { for (int j = 0; j < 100; j++) { lease.poll(); } @@ -167,7 +172,7 @@ public class ConsumerPoolTest { final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues); when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{})); - try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) { + try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) { lease.poll(); lease.commit(); } @@ -184,7 +189,7 @@ public class ConsumerPoolTest { public void validatePoolConsumerFails() throws Exception { when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops")); - try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) { + try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) { try { lease.poll(); fail(); http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java new file mode 100644 index 0000000..da63877 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestConsumeKafkaRecord_0_10 { + + private ConsumerLease mockLease = null; + private ConsumerPool mockConsumerPool = null; + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + + ConsumeKafkaRecord_0_10 proc = new ConsumeKafkaRecord_0_10() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + + runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234"); + + final String readerId = "record-reader"; + final MockRecordParser readerService = new MockRecordParser(); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + runner.addControllerService(readerId, readerService); + runner.enableControllerService(readerService); + + final String writerId = "record-writer"; + final RecordSetWriterFactory writerService = new MockRecordWriter("name, age"); + runner.addControllerService(writerId, writerService); + runner.enableControllerService(writerService); + + runner.setProperty(ConsumeKafkaRecord_0_10.RECORD_READER, readerId); + runner.setProperty(ConsumeKafkaRecord_0_10.RECORD_WRITER, writerId); + } + + @Test + public void validateCustomValidatorSettings() throws Exception { + runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo"); + runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.assertValid(); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo"); + runner.assertNotValid(); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.assertValid(); + runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + runner.assertValid(); + runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + runner.assertNotValid(); + } + + @Test + public void validatePropertiesValidation() throws Exception { + runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo"); + runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); + + runner.removeProperty(ConsumeKafkaRecord_0_10.GROUP_ID); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("invalid because Group ID is required")); + } + + runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + runner.setValidateExpressionUsage(false); + runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + runner.setValidateExpressionUsage(false); + runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + runner.setValidateExpressionUsage(false); + runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void testJaasConfiguration() throws Exception { + runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo"); + runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST); + + runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka"); + runner.assertValid(); + + runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "[email protected]"); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File"); + runner.assertNotValid(); + + runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties"); + runner.assertValid(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java new file mode 100644 index 0000000..c1df792 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; +import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RecordWriter; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestPublishKafkaRecord_0_10 { + + private static final String TOPIC_NAME = "unit-test"; + + private PublisherPool mockPool; + private PublisherLease mockLease; + private TestRunner runner; + + @Before + public void setup() throws InitializationException, IOException { + mockPool = mock(PublisherPool.class); + mockLease = mock(PublisherLease.class); + Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), any(String.class), any(String.class)); + + when(mockPool.obtainPublisher()).thenReturn(mockLease); + + runner = TestRunners.newTestRunner(new PublishKafkaRecord_0_10() { + @Override + protected PublisherPool createPublisherPool(final ProcessContext context) { + return mockPool; + } + }); + + runner.setProperty(PublishKafkaRecord_0_10.TOPIC, TOPIC_NAME); + + final String readerId = "record-reader"; + final MockRecordParser readerService = new MockRecordParser(); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + runner.addControllerService(readerId, readerService); + runner.enableControllerService(readerService); + + final String writerId = "record-writer"; + final RecordSetWriterFactory writerService = new MockRecordWriter("name, age"); + runner.addControllerService(writerId, writerService); + runner.enableControllerService(writerService); + + runner.setProperty(PublishKafkaRecord_0_10.RECORD_READER, readerId); + runner.setProperty(PublishKafkaRecord_0_10.RECORD_WRITER, writerId); + } + + @Test + public void testSingleSuccess() throws IOException { + final MockFlowFile flowFile = runner.enqueue("John Doe, 48"); + + when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1); + + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testMultipleSuccess() throws IOException { + final Set<FlowFile> flowFiles = new HashSet<>(); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + + + when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3); + + verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testSingleFailure() throws IOException { + final MockFlowFile flowFile = runner.enqueue("John Doe, 48"); + + when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1); + + verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testMultipleFailures() throws IOException { + final Set<FlowFile> flowFiles = new HashSet<>(); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + + when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles)); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3); + + verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(1)).close(); + } + + @Test + public void testMultipleMessagesPerFlowFile() throws IOException { + final List<FlowFile> flowFiles = new ArrayList<>(); + flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47")); + flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 29")); + + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + msgCounts.put(flowFiles.get(0), 10); + msgCounts.put(flowFiles.get(1), 20); + + final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap()); + + when(mockLease.complete()).thenReturn(result); + + runner.run(); + runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2); + + verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(0)).poison(); + verify(mockLease, times(1)).close(); + + runner.assertAllFlowFilesContainAttribute("msg.count"); + assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream() + .filter(ff -> ff.getAttribute("msg.count").equals("10")) + .count()); + assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream() + .filter(ff -> ff.getAttribute("msg.count").equals("20")) + .count()); + } + + + @Test + public void testSomeSuccessSomeFailure() throws IOException { + final List<FlowFile> flowFiles = new ArrayList<>(); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + flowFiles.add(runner.enqueue("John Doe, 48")); + + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + msgCounts.put(flowFiles.get(0), 10); + msgCounts.put(flowFiles.get(1), 20); + + final Map<FlowFile, Exception> failureMap = new HashMap<>(); + failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception")); + failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception")); + + final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap); + + when(mockLease.complete()).thenReturn(result); + + runner.run(); + runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2); + runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2); + + verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME)); + verify(mockLease, times(1)).complete(); + verify(mockLease, times(1)).close(); + + assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream() + .filter(ff -> "10".equals(ff.getAttribute("msg.count"))) + .count()); + assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).stream() + .filter(ff -> "20".equals(ff.getAttribute("msg.count"))) + .count()); + + assertTrue(runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_FAILURE).stream() + .noneMatch(ff -> ff.getAttribute("msg.count") != null)); + } + + + private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) { + return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount); + } + + private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) { + final Map<FlowFile, Integer> msgCounts = new HashMap<>(); + for (final FlowFile ff : successfulFlowFiles) { + msgCounts.put(ff, msgCountPerFlowFile); + } + return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap()); + } + + private PublishResult createFailurePublishResult(final FlowFile failure) { + return createFailurePublishResult(Collections.singleton(failure)); + } + + private PublishResult createFailurePublishResult(final Set<FlowFile> failures) { + final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception"))); + return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap); + } + + private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) { + // sanity check. + for (final FlowFile success : successFlowFiles) { + if (failures.containsKey(success)) { + throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success); + } + } + + return new PublishResult() { + @Override + public Collection<FlowFile> getSuccessfulFlowFiles() { + return successFlowFiles; + } + + @Override + public Collection<FlowFile> getFailedFlowFiles() { + return failures.keySet(); + } + + @Override + public int getSuccessfulMessageCount(FlowFile flowFile) { + Integer count = msgCounts.get(flowFile); + return count == null ? 0 : count.intValue(); + } + + @Override + public Exception getReasonForFailure(FlowFile flowFile) { + return failures.get(flowFile); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java new file mode 100644 index 0000000..b47bece --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { + private final List<Object[]> records = new ArrayList<>(); + private final List<RecordField> fields = new ArrayList<>(); + private final int failAfterN; + + public MockRecordParser() { + this(-1); + } + + public MockRecordParser(final int failAfterN) { + this.failAfterN = failAfterN; + } + + + public void addSchemaField(final String fieldName, final RecordFieldType type) { + fields.add(new RecordField(fieldName, type.getDataType())); + } + + public void addRecord(Object... values) { + records.add(values); + } + + @Override + public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + + return new RecordReader() { + private int recordCount = 0; + + @Override + public void close() throws IOException { + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + if (failAfterN >= recordCount) { + throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); + } + final String line = reader.readLine(); + if (line == null) { + return null; + } + + recordCount++; + + final String[] values = line.split(","); + final Map<String, Object> valueMap = new HashMap<>(); + int i = 0; + for (final RecordField field : fields) { + final String fieldName = field.getFieldName(); + valueMap.put(fieldName, values[i++].trim()); + } + + return new MapRecord(new SimpleRecordSchema(fields), valueMap); + } + + @Override + public RecordSchema getSchema() { + return new SimpleRecordSchema(fields); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java new file mode 100644 index 0000000..22d7249 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kafka.pubsub.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSet; + +public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { + private final String header; + private final int failAfterN; + private final boolean quoteValues; + + public MockRecordWriter(final String header) { + this(header, true, -1); + } + + public MockRecordWriter(final String header, final boolean quoteValues) { + this(header, quoteValues, -1); + } + + public MockRecordWriter(final String header, final boolean quoteValues, final int failAfterN) { + this.header = header; + this.quoteValues = quoteValues; + this.failAfterN = failAfterN; + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) { + return new RecordSetWriter() { + @Override + public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { + out.write(header.getBytes()); + out.write("\n".getBytes()); + + int recordCount = 0; + final int numCols = rs.getSchema().getFieldCount(); + Record record = null; + while ((record = rs.next()) != null) { + if (++recordCount > failAfterN && failAfterN > -1) { + throw new IOException("Unit Test intentionally throwing IOException after " + failAfterN + " records were written"); + } + + int i = 0; + for (final String fieldName : record.getSchema().getFieldNames()) { + final String val = record.getAsString(fieldName); + if (quoteValues) { + out.write("\"".getBytes()); + if (val != null) { + out.write(val.getBytes()); + } + out.write("\"".getBytes()); + } else if (val != null) { + out.write(val.getBytes()); + } + + if (i++ < numCols - 1) { + out.write(",".getBytes()); + } + } + out.write("\n".getBytes()); + } + + return WriteResult.of(recordCount, Collections.emptyMap()); + } + + @Override + public String getMimeType() { + return "text/plain"; + } + + @Override + public WriteResult write(Record record, OutputStream out) throws IOException { + return null; + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java index 8fcb016..f48d0f5 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; import org.apache.avro.LogicalType; @@ -53,6 +55,7 @@ import org.apache.nifi.serialization.record.SchemaIdentifier; public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry { private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); private final Map<String, String> schemaNameToSchemaMap; + private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>(); private static final String LOGICAL_TYPE_DATE = "date"; private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; @@ -65,6 +68,21 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch } @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (newValue == null) { + recordSchemas.remove(descriptor.getName()); + } else { + try { + final Schema avroSchema = new Schema.Parser().parse(newValue); + final RecordSchema recordSchema = createRecordSchema(avroSchema, newValue, descriptor.getName()); + recordSchemas.put(descriptor.getName(), recordSchema); + } catch (final Exception e) { + // not a problem - the service won't be valid and the validation message will indicate what is wrong. + } + } + } + + @Override public String retrieveSchemaText(final String schemaName) throws SchemaNotFoundException { final String schemaText = schemaNameToSchemaMap.get(schemaName); if (schemaText == null) { @@ -76,9 +94,11 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch @Override public RecordSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException { - final String schemaText = retrieveSchemaText(schemaName); - final Schema schema = new Schema.Parser().parse(schemaText); - return createRecordSchema(schema, schemaText, schemaName); + final RecordSchema recordSchema = recordSchemas.get(schemaName); + if (recordSchema == null) { + throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'"); + } + return recordSchema; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/07989b84/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java index a63097a..9121f04 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java @@ -21,15 +21,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Assert; import org.junit.Test; @@ -69,44 +65,4 @@ public class TestAvroSchemaRegistry { delegate.close(); } - - - @Test - public void validateRecordSchemaRetrieval() throws Exception { - String schemaName = "fooSchema"; - ConfigurationContext configContext = mock(ConfigurationContext.class); - Map<PropertyDescriptor, String> properties = new HashMap<>(); - PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() - .name(schemaName) - .dynamic(true) - .build(); - String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " - + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, " - + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " - + "{\"name\": \"foo\", \"type\": \"boolean\"}, " - + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}"; - PropertyDescriptor barSchema = new PropertyDescriptor.Builder() - .name("barSchema") - .dynamic(false) - .build(); - properties.put(fooSchema, fooSchemaText); - properties.put(barSchema, ""); - when(configContext.getProperties()).thenReturn(properties); - AvroSchemaRegistry delegate = new AvroSchemaRegistry(); - delegate.enable(configContext); - - RecordSchema locatedSchema = delegate.retrieveSchema(schemaName); - List<RecordField> recordFields = locatedSchema.getFields(); - assertEquals(4, recordFields.size()); - assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(0).getDataType()); - assertEquals("name", recordFields.get(0).getFieldName()); - assertEquals(RecordFieldType.INT.getDataType(), recordFields.get(1).getDataType()); - assertEquals("favorite_number", recordFields.get(1).getFieldName()); - assertEquals(RecordFieldType.BOOLEAN.getDataType(), recordFields.get(2).getDataType()); - assertEquals("foo", recordFields.get(2).getFieldName()); - assertEquals(RecordFieldType.STRING.getDataType(), recordFields.get(3).getDataType()); - assertEquals("favorite_color", recordFields.get(3).getFieldName()); - delegate.close(); - } - }
