http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml new file mode 100644 index 0000000..0f51298 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml @@ -0,0 +1,92 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kafka-bundle</artifactId> + <version>1.5.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>nifi-kafka-1-0-processors</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${kafka1.0.version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${kafka1.0.version}</version> + <scope>test</scope> + <exclusions> + <!-- Transitive dependencies excluded because they are located + in a legacy Maven repository, which Maven 3 doesn't support. --> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java new file mode 100644 index 0000000..f0c1bd0 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 1.0 Consumer API. " + + "The complementary NiFi processor for sending messages is PublishKafkaRecord_1_0. Please note that, at this time, the Processor assumes that " + + "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the " + + "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " + + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. " + + "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Kafka messages will be placed into the same FlowFile if they " + + "have different schemas, or if they have different values for a message header that is included by the <Headers to Add as Attributes> property.") +@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "1.0"}) +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "The number of records received"), + @WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") +@SeeAlso({ConsumeKafka_1_0.class, PublishKafka_1_0.class, PublishKafkaRecord_1_0.class}) +public class ConsumeKafkaRecord_1_0 extends AbstractProcessor { + + static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset"); + static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset"); + static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); + static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names"); + static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax"); + + static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() + .name("topic") + .displayName("Topic Name(s)") + .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() + .name("topic_type") + .displayName("Topic Name Format") + .description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression") + .required(true) + .allowableValues(TOPIC_NAME, TOPIC_PATTERN) + .defaultValue(TOPIC_NAME.getValue()) + .build(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The Record Reader to use for incoming FlowFiles") + .identifiesControllerService(RecordReaderFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("The Record Writer to use in order to serialize the data before sending to Kafka") + .identifiesControllerService(RecordSetWriterFactory.class) + .expressionLanguageSupported(false) + .required(true) + .build(); + + static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() + .name("group.id") + .displayName("Group ID") + .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() + .name("auto.offset.reset") + .displayName("Offset Reset") + .description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any " + + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.") + .required(true) + .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE) + .defaultValue(OFFSET_LATEST.getValue()) + .build(); + + static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder() + .name("max.poll.records") + .displayName("Max Poll Records") + .description("Specifies the maximum number of records Kafka should return in a single poll.") + .required(false) + .defaultValue("10000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder() + .name("max-uncommit-offset-wait") + .displayName("Max Uncommitted Time") + .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. " + + "This value impacts how often offsets will be committed. Committing offsets less often increases " + + "throughput but also increases the window of potential data duplication in the event of a rebalance " + + "or JVM restart between commits. This value is also related to maximum poll records and the use " + + "of a message demarcator. When using a message demarcator we can have far more uncommitted messages " + + "than when we're not as there is much less for us to keep track of in memory.") + .required(false) + .defaultValue("1 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + static final PropertyDescriptor HONOR_TRANSACTIONS = new PropertyDescriptor.Builder() + .name("honor-transactions") + .displayName("Honor Transactions") + .description("Specifies whether or not NiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of " + + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If " + + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait " + + "for the producer to finish its entire transaction instead of pulling as the messages become available.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder() + .name("message-header-encoding") + .displayName("Message Header Encoding") + .description("Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. " + + "This property indicates the Character Encoding to use for deserializing the headers.") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .required(false) + .build(); + static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder() + .name("header-name-regex") + .displayName("Headers to Add as Attributes (Regex)") + .description("A Regular Expression that is matched against all message headers. " + + "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. " + + "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by " + + "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like " + + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling " + + "the messages together efficiently.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(false) + .required(false) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.") + .build(); + static final Relationship REL_PARSE_FAILURE = new Relationship.Builder() + .name("parse.failure") + .description("If a message from Kafka cannot be parsed using the configured Record Reader, the contents of the " + + "message will be routed to this Relationship as its own individual FlowFile.") + .build(); + + static final List<PropertyDescriptor> DESCRIPTORS; + static final Set<Relationship> RELATIONSHIPS; + + private volatile ConsumerPool consumerPool = null; + private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>()); + + static { + List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS); + descriptors.add(TOPICS); + descriptors.add(TOPIC_TYPE); + descriptors.add(RECORD_READER); + descriptors.add(RECORD_WRITER); + descriptors.add(HONOR_TRANSACTIONS); + descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL); + descriptors.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE); + descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL); + descriptors.add(KafkaProcessorUtils.USER_KEYTAB); + descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); + descriptors.add(GROUP_ID); + descriptors.add(AUTO_OFFSET_RESET); + descriptors.add(MESSAGE_HEADER_ENCODING); + descriptors.add(HEADER_NAME_REGEX); + descriptors.add(MAX_POLL_RECORDS); + descriptors.add(MAX_UNCOMMITTED_TIME); + DESCRIPTORS = Collections.unmodifiableList(descriptors); + + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_PARSE_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(rels); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @OnStopped + public void close() { + final ConsumerPool pool = consumerPool; + consumerPool = null; + + if (pool != null) { + pool.close(); + } + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + return KafkaProcessorUtils.validateCommonProperties(validationContext); + } + + private synchronized ConsumerPool getConsumerPool(final ProcessContext context) { + ConsumerPool pool = consumerPool; + if (pool != null) { + return pool; + } + + return consumerPool = createConsumerPool(context, getLogger()); + } + + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + final int maxLeases = context.getMaxConcurrentTasks(); + final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS); + + final Map<String, Object> props = new HashMap<>(); + KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + final String topicListing = context.getProperty(ConsumeKafkaRecord_1_0.TOPICS).evaluateAttributeExpressions().getValue(); + final String topicType = context.getProperty(ConsumeKafkaRecord_1_0.TOPIC_TYPE).evaluateAttributeExpressions().getValue(); + final List<String> topics = new ArrayList<>(); + final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final boolean honorTransactions = context.getProperty(HONOR_TRANSACTIONS).asBoolean(); + + final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue(); + final Charset charset = Charset.forName(charsetName); + + final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue(); + final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex); + + if (topicType.equals(TOPIC_NAME.getValue())) { + for (final String topic : topicListing.split(",", 100)) { + final String trimmedName = topic.trim(); + if (!trimmedName.isEmpty()) { + topics.add(trimmedName); + } + } + + return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol, + bootstrapServers, log, honorTransactions, charset, headerNamePattern); + } else if (topicType.equals(TOPIC_PATTERN.getValue())) { + final Pattern topicPattern = Pattern.compile(topicListing.trim()); + return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol, + bootstrapServers, log, honorTransactions, charset, headerNamePattern); + } else { + getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType}); + return null; + } + } + + @OnUnscheduled + public void interruptActiveThreads() { + // There are known issues with the Kafka client library that result in the client code hanging + // indefinitely when unable to communicate with the broker. In order to address this, we will wait + // up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the + // thread to wakeup when it is blocked, waiting on a response. + final long nanosToWait = TimeUnit.SECONDS.toNanos(5L); + final long start = System.nanoTime(); + while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) { + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + + if (!activeLeases.isEmpty()) { + int count = 0; + for (final ConsumerLease lease : activeLeases) { + getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease}); + lease.wakeup(); + count++; + } + + getLogger().info("Woke up {} consumers", new Object[] {count}); + } + + activeLeases.clear(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final ConsumerPool pool = getConsumerPool(context); + if (pool == null) { + context.yield(); + return; + } + + try (final ConsumerLease lease = pool.obtainConsumer(session, context)) { + if (lease == null) { + context.yield(); + return; + } + + activeLeases.add(lease); + try { + while (this.isScheduled() && lease.continuePolling()) { + lease.poll(); + } + if (this.isScheduled() && !lease.commit()) { + context.yield(); + } + } catch (final WakeupException we) { + getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. " + + "Will roll back session and discard any partially received data.", new Object[] {lease}); + } catch (final KafkaException kex) { + getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", + new Object[]{lease, kex}, kex); + } catch (final Throwable t) { + getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", + new Object[]{lease, t}, t); + } finally { + activeLeases.remove(lease); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java new file mode 100644 index 0000000..fdc2cb3 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING; + +@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 1.0 Consumer API. " + + "The complementary NiFi processor for sending messages is PublishKafka_1_0.") +@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "1.0"}) +@WritesAttributes({ + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. " + + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ") +public class ConsumeKafka_1_0 extends AbstractProcessor { + + static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset"); + + static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset"); + + static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); + + static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names"); + + static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax"); + + static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() + .name("topic") + .displayName("Topic Name(s)") + .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() + .name("topic_type") + .displayName("Topic Name Format") + .description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression") + .required(true) + .allowableValues(TOPIC_NAME, TOPIC_PATTERN) + .defaultValue(TOPIC_NAME.getValue()) + .build(); + + static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() + .name(ConsumerConfig.GROUP_ID_CONFIG) + .displayName("Group ID") + .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() + .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) + .displayName("Offset Reset") + .description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any " + + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.") + .required(true) + .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE) + .defaultValue(OFFSET_LATEST.getValue()) + .build(); + + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("key-attribute-encoding") + .displayName("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(UTF8_ENCODING.getValue()) + .allowableValues(UTF8_ENCODING, HEX_ENCODING) + .build(); + + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("message-demarcator") + .displayName("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains " + + "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use " + + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received " + + "will result in a single FlowFile which " + + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS") + .build(); + static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder() + .name("header-name-regex") + .displayName("Headers to Add as Attributes (Regex)") + .description("A Regular Expression that is matched against all message headers. " + + "Any message header whose name matches the regex will be added to the FlowFile as an Attribute. " + + "If not specified, no Header values will be added as FlowFile attributes. If two messages have a different value for the same header and that header is selected by " + + "the provided regex, then those two messages must be added to different FlowFiles. As a result, users should be cautious about using a regex like " + + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling " + + "the messages together efficiently.") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .expressionLanguageSupported(false) + .required(false) + .build(); + + static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder() + .name("max.poll.records") + .displayName("Max Poll Records") + .description("Specifies the maximum number of records Kafka should return in a single poll.") + .required(false) + .defaultValue("10000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder() + .name("max-uncommit-offset-wait") + .displayName("Max Uncommitted Time") + .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. " + + "This value impacts how often offsets will be committed. Committing offsets less often increases " + + "throughput but also increases the window of potential data duplication in the event of a rebalance " + + "or JVM restart between commits. This value is also related to maximum poll records and the use " + + "of a message demarcator. When using a message demarcator we can have far more uncommitted messages " + + "than when we're not as there is much less for us to keep track of in memory.") + .required(false) + .defaultValue("1 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + static final PropertyDescriptor HONOR_TRANSACTIONS = new PropertyDescriptor.Builder() + .name("honor-transactions") + .displayName("Honor Transactions") + .description("Specifies whether or not NiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of " + + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If " + + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait " + + "for the producer to finish its entire transaction instead of pulling as the messages become available.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder() + .name("message-header-encoding") + .displayName("Message Header Encoding") + .description("Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. " + + "This property indicates the Character Encoding to use for deserializing the headers.") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .required(false) + .build(); + + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.") + .build(); + + static final List<PropertyDescriptor> DESCRIPTORS; + static final Set<Relationship> RELATIONSHIPS; + + private volatile ConsumerPool consumerPool = null; + private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>()); + + static { + List<PropertyDescriptor> descriptors = new ArrayList<>(); + descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors()); + descriptors.add(TOPICS); + descriptors.add(TOPIC_TYPE); + descriptors.add(HONOR_TRANSACTIONS); + descriptors.add(GROUP_ID); + descriptors.add(AUTO_OFFSET_RESET); + descriptors.add(KEY_ATTRIBUTE_ENCODING); + descriptors.add(MESSAGE_DEMARCATOR); + descriptors.add(MESSAGE_HEADER_ENCODING); + descriptors.add(HEADER_NAME_REGEX); + descriptors.add(MAX_POLL_RECORDS); + descriptors.add(MAX_UNCOMMITTED_TIME); + DESCRIPTORS = Collections.unmodifiableList(descriptors); + RELATIONSHIPS = Collections.singleton(REL_SUCCESS); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @OnStopped + public void close() { + final ConsumerPool pool = consumerPool; + consumerPool = null; + if (pool != null) { + pool.close(); + } + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + return KafkaProcessorUtils.validateCommonProperties(validationContext); + } + + private synchronized ConsumerPool getConsumerPool(final ProcessContext context) { + ConsumerPool pool = consumerPool; + if (pool != null) { + return pool; + } + + return consumerPool = createConsumerPool(context, getLogger()); + } + + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + final int maxLeases = context.getMaxConcurrentTasks(); + final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS); + final byte[] demarcator = context.getProperty(ConsumeKafka_1_0.MESSAGE_DEMARCATOR).isSet() + ? context.getProperty(ConsumeKafka_1_0.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) + : null; + final Map<String, Object> props = new HashMap<>(); + KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + + final String topicListing = context.getProperty(ConsumeKafka_1_0.TOPICS).evaluateAttributeExpressions().getValue(); + final String topicType = context.getProperty(ConsumeKafka_1_0.TOPIC_TYPE).evaluateAttributeExpressions().getValue(); + final List<String> topics = new ArrayList<>(); + final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); + final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); + final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + final boolean honorTransactions = context.getProperty(HONOR_TRANSACTIONS).asBoolean(); + + final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue(); + final Charset charset = Charset.forName(charsetName); + + final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue(); + final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex); + + if (topicType.equals(TOPIC_NAME.getValue())) { + for (final String topic : topicListing.split(",", 100)) { + final String trimmedName = topic.trim(); + if (!trimmedName.isEmpty()) { + topics.add(trimmedName); + } + } + + return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, + bootstrapServers, log, honorTransactions, charset, headerNamePattern); + } else if (topicType.equals(TOPIC_PATTERN.getValue())) { + final Pattern topicPattern = Pattern.compile(topicListing.trim()); + return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol, + bootstrapServers, log, honorTransactions, charset, headerNamePattern); + } else { + getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType}); + return null; + } + } + + @OnUnscheduled + public void interruptActiveThreads() { + // There are known issues with the Kafka client library that result in the client code hanging + // indefinitely when unable to communicate with the broker. In order to address this, we will wait + // up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the + // thread to wakeup when it is blocked, waiting on a response. + final long nanosToWait = TimeUnit.SECONDS.toNanos(5L); + final long start = System.nanoTime(); + while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) { + try { + Thread.sleep(100L); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + + if (!activeLeases.isEmpty()) { + int count = 0; + for (final ConsumerLease lease : activeLeases) { + getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease}); + lease.wakeup(); + count++; + } + + getLogger().info("Woke up {} consumers", new Object[] {count}); + } + + activeLeases.clear(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final ConsumerPool pool = getConsumerPool(context); + if (pool == null) { + context.yield(); + return; + } + + try (final ConsumerLease lease = pool.obtainConsumer(session, context)) { + if (lease == null) { + context.yield(); + return; + } + + activeLeases.add(lease); + try { + while (this.isScheduled() && lease.continuePolling()) { + lease.poll(); + } + if (this.isScheduled() && !lease.commit()) { + context.yield(); + } + } catch (final WakeupException we) { + getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. " + + "Will roll back session and discard any partially received data.", new Object[] {lease}); + } catch (final KafkaException kex) { + getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", + new Object[]{lease, kex}, kex); + } catch (final Throwable t) { + getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", + new Object[]{lease, t}, t); + } finally { + activeLeases.remove(lease); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java new file mode 100644 index 0000000..a2a449c --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -0,0 +1,701 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE; +import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_SUCCESS; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import javax.xml.bind.DatatypeConverter; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; + +/** + * This class represents a lease to access a Kafka Consumer object. The lease is + * intended to be obtained from a ConsumerPool. The lease is closeable to allow + * for the clean model of a try w/resources whereby non-exceptional cases mean + * the lease will be returned to the pool for future use by others. A given + * lease may only belong to a single thread a time. + */ +public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener { + + private final long maxWaitMillis; + private final Consumer<byte[], byte[]> kafkaConsumer; + private final ComponentLog logger; + private final byte[] demarcatorBytes; + private final String keyEncoding; + private final String securityProtocol; + private final String bootstrapServers; + private final RecordSetWriterFactory writerFactory; + private final RecordReaderFactory readerFactory; + private final Charset headerCharacterSet; + private final Pattern headerNamePattern; + private boolean poisoned = false; + //used for tracking demarcated flowfiles to their TopicPartition so we can append + //to them on subsequent poll calls + private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<>(); + private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>(); + private long leaseStartNanos = -1; + private boolean lastPollEmpty = false; + private int totalMessages = 0; + + ConsumerLease( + final long maxWaitMillis, + final Consumer<byte[], byte[]> kafkaConsumer, + final byte[] demarcatorBytes, + final String keyEncoding, + final String securityProtocol, + final String bootstrapServers, + final RecordReaderFactory readerFactory, + final RecordSetWriterFactory writerFactory, + final ComponentLog logger, + final Charset headerCharacterSet, + final Pattern headerNamePattern) { + this.maxWaitMillis = maxWaitMillis; + this.kafkaConsumer = kafkaConsumer; + this.demarcatorBytes = demarcatorBytes; + this.keyEncoding = keyEncoding; + this.securityProtocol = securityProtocol; + this.bootstrapServers = bootstrapServers; + this.readerFactory = readerFactory; + this.writerFactory = writerFactory; + this.logger = logger; + this.headerCharacterSet = headerCharacterSet; + this.headerNamePattern = headerNamePattern; + } + + /** + * clears out internal state elements excluding session and consumer as + * those are managed by the pool itself + */ + private void resetInternalState() { + bundleMap.clear(); + uncommittedOffsetsMap.clear(); + leaseStartNanos = -1; + lastPollEmpty = false; + totalMessages = 0; + } + + /** + * Kafka will call this method whenever it is about to rebalance the + * consumers for the given partitions. We'll simply take this to mean that + * we need to quickly commit what we've got and will return the consumer to + * the pool. This method will be called during the poll() method call of + * this class and will be called by the same thread calling poll according + * to the Kafka API docs. After this method executes the session and kafka + * offsets are committed and this lease is closed. + * + * @param partitions partitions being reassigned + */ + @Override + public void onPartitionsRevoked(final Collection<TopicPartition> partitions) { + logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition + commit(); + } + + /** + * This will be called by Kafka when the rebalance has completed. We don't + * need to do anything with this information other than optionally log it as + * by this point we've committed what we've got and moved on. + * + * @param partitions topic partition set being reassigned + */ + @Override + public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { + logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer}); + } + + /** + * Executes a poll on the underlying Kafka Consumer and creates any new + * flowfiles necessary or appends to existing ones if in demarcation mode. + */ + void poll() { + /** + * Implementation note: + * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged, + * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread. + * If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends + * Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks + * if this client instance is still a part of consumer group. If not, it rejoins before polling messages. + * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0. + */ + try { + final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10); + lastPollEmpty = records.count() == 0; + processRecords(records); + } catch (final ProcessException pe) { + throw pe; + } catch (final Throwable t) { + this.poison(); + throw t; + } + } + + /** + * Notifies Kafka to commit the offsets for the specified topic/partition + * pairs to the specified offsets w/the given metadata. This can offer + * higher performance than the other commitOffsets call as it allows the + * kafka client to collect more data from Kafka before committing the + * offsets. + * + * if false then we didn't do anything and should probably yield if true + * then we committed new data + * + */ + boolean commit() { + if (uncommittedOffsetsMap.isEmpty()) { + resetInternalState(); + return false; + } + try { + /** + * Committing the nifi session then the offsets means we have an at + * least once guarantee here. If we reversed the order we'd have at + * most once. + */ + final Collection<FlowFile> bundledFlowFiles = getBundles(); + if (!bundledFlowFiles.isEmpty()) { + getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS); + } + getProcessSession().commit(); + + final Map<TopicPartition, OffsetAndMetadata> offsetsMap = uncommittedOffsetsMap; + kafkaConsumer.commitSync(offsetsMap); + resetInternalState(); + return true; + } catch (final IOException ioe) { + poison(); + logger.error("Failed to finish writing out FlowFile bundle", ioe); + throw new ProcessException(ioe); + } catch (final KafkaException kex) { + poison(); + logger.warn("Duplicates are likely as we were able to commit the process" + + " session but received an exception from Kafka while committing" + + " offsets."); + throw kex; + } catch (final Throwable t) { + poison(); + throw t; + } + } + + /** + * Indicates whether we should continue polling for data. If we are not + * writing data with a demarcator then we're writing individual flow files + * per kafka message therefore we must be very mindful of memory usage for + * the flow file objects (not their content) being held in memory. The + * content of kafka messages will be written to the content repository + * immediately upon each poll call but we must still be mindful of how much + * memory can be used in each poll call. We will indicate that we should + * stop polling our last poll call produced no new results or if we've + * polling and processing data longer than the specified maximum polling + * time or if we have reached out specified max flow file limit or if a + * rebalance has been initiated for one of the partitions we're watching; + * otherwise true. + * + * @return true if should keep polling; false otherwise + */ + boolean continuePolling() { + //stop if the last poll produced new no data + if (lastPollEmpty) { + return false; + } + + //stop if we've gone past our desired max uncommitted wait time + if (leaseStartNanos < 0) { + leaseStartNanos = System.nanoTime(); + } + final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos); + if (durationMillis > maxWaitMillis) { + return false; + } + + //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects + if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track + return false; + } else { + return totalMessages < 1000;//admittedlly a magic number - good candidate for processor property + } + } + + /** + * Indicates that the underlying session and consumer should be immediately + * considered invalid. Once closed the session will be rolled back and the + * pool should destroy the underlying consumer. This is useful if due to + * external reasons, such as the processor no longer being scheduled, this + * lease should be terminated immediately. + */ + private void poison() { + poisoned = true; + } + + /** + * @return true if this lease has been poisoned; false otherwise + */ + boolean isPoisoned() { + return poisoned; + } + + /** + * Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method. + */ + public void wakeup() { + kafkaConsumer.wakeup(); + } + + /** + * Abstract method that is intended to be extended by the pool that created + * this ConsumerLease object. It should ensure that the session given to + * create this session is rolled back and that the underlying kafka consumer + * is either returned to the pool for continued use or destroyed if this + * lease has been poisoned. It can only be called once. Calling it more than + * once can result in undefined and non threadsafe behavior. + */ + @Override + public void close() { + resetInternalState(); + } + + public abstract ProcessSession getProcessSession(); + + public abstract void yield(); + + private void processRecords(final ConsumerRecords<byte[], byte[]> records) { + records.partitions().stream().forEach(partition -> { + List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition); + if (!messages.isEmpty()) { + //update maximum offset map for this topic partition + long maxOffset = messages.stream() + .mapToLong(record -> record.offset()) + .max() + .getAsLong(); + + //write records to content repository and session + if (demarcatorBytes != null) { + writeDemarcatedData(getProcessSession(), messages, partition); + } else if (readerFactory != null && writerFactory != null) { + writeRecordData(getProcessSession(), messages, partition); + } else { + messages.stream().forEach(message -> { + writeData(getProcessSession(), message, partition); + }); + } + + totalMessages += messages.size(); + uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L)); + } + }); + } + + private static String encodeKafkaKey(final byte[] key, final String encoding) { + if (key == null) { + return null; + } + + if (HEX_ENCODING.getValue().equals(encoding)) { + return DatatypeConverter.printHexBinary(key); + } else if (UTF8_ENCODING.getValue().equals(encoding)) { + return new String(key, StandardCharsets.UTF_8); + } else { + return null; // won't happen because it is guaranteed by the Allowable Values + } + } + + private Collection<FlowFile> getBundles() throws IOException { + final List<FlowFile> flowFiles = new ArrayList<>(); + for (final BundleTracker tracker : bundleMap.values()) { + final boolean includeBundle = processBundle(tracker); + if (includeBundle) { + flowFiles.add(tracker.flowFile); + } + } + return flowFiles; + } + + private boolean processBundle(final BundleTracker bundle) throws IOException { + final RecordSetWriter writer = bundle.recordWriter; + if (writer != null) { + final WriteResult writeResult; + + try { + writeResult = writer.finishRecordSet(); + } finally { + writer.close(); + } + + if (writeResult.getRecordCount() == 0) { + getProcessSession().remove(bundle.flowFile); + return false; + } + + final Map<String, String> attributes = new HashMap<>(); + attributes.putAll(writeResult.getAttributes()); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + + bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes); + } + + populateAttributes(bundle); + return true; + } + + private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) { + FlowFile flowFile = session.create(); + final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding); + tracker.incrementRecordCount(1); + final byte[] value = record.value(); + if (value != null) { + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + } + flowFile = session.putAllAttributes(flowFile, getAttributes(record)); + tracker.updateFlowFile(flowFile); + populateAttributes(tracker); + session.transfer(tracker.flowFile, REL_SUCCESS); + } + + private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) { + // Group the Records by their BundleInformation + final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map = records.stream() + .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec)))); + + for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> entry : map.entrySet()) { + final BundleInformation bundleInfo = entry.getKey(); + final List<ConsumerRecord<byte[], byte[]>> recordList = entry.getValue(); + + final boolean demarcateFirstRecord; + + BundleTracker tracker = bundleMap.get(bundleInfo); + + FlowFile flowFile; + if (tracker == null) { + tracker = new BundleTracker(recordList.get(0), topicPartition, keyEncoding); + flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, bundleInfo.attributes); + tracker.updateFlowFile(flowFile); + demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease + } else { + demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease + } + flowFile = tracker.flowFile; + + tracker.incrementRecordCount(recordList.size()); + flowFile = session.append(flowFile, out -> { + boolean useDemarcator = demarcateFirstRecord; + for (final ConsumerRecord<byte[], byte[]> record : recordList) { + if (useDemarcator) { + out.write(demarcatorBytes); + } + final byte[] value = record.value(); + if (value != null) { + out.write(record.value()); + } + useDemarcator = true; + } + }); + + tracker.updateFlowFile(flowFile); + bundleMap.put(bundleInfo, tracker); + } + } + + private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause) { + handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. " + + "Will route message as its own FlowFile to the 'parse.failure' relationship"); + } + + private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) { + // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship + final Map<String, String> attributes = getAttributes(consumerRecord); + attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); + attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition())); + attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic()); + + FlowFile failureFlowFile = session.create(); + + final byte[] value = consumerRecord.value(); + if (value != null) { + failureFlowFile = session.write(failureFlowFile, out -> out.write(value)); + } + failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); + + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); + session.getProvenanceReporter().receive(failureFlowFile, transitUri); + + session.transfer(failureFlowFile, REL_PARSE_FAILURE); + + if (cause == null) { + logger.error(message); + } else { + logger.error(message, cause); + } + + session.adjustCounter("Parse Failures", 1, false); + } + + private Map<String, String> getAttributes(final ConsumerRecord<?, ?> consumerRecord) { + final Map<String, String> attributes = new HashMap<>(); + if (headerNamePattern == null) { + return attributes; + } + + for (final Header header : consumerRecord.headers()) { + final String attributeName = header.key(); + if (headerNamePattern.matcher(attributeName).matches()) { + attributes.put(attributeName, new String(header.value(), headerCharacterSet)); + } + } + + return attributes; + } + + private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) { + // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile. + // We don't want to create a new FlowFile for each record that we receive, so we will just create + // a "temporary flowfile" that will be removed in the finally block below and use that to pass to + // the createRecordReader method. + RecordSetWriter writer = null; + try { + for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) { + final Map<String, String> attributes = getAttributes(consumerRecord); + + final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value(); + try (final InputStream in = new ByteArrayInputStream(recordBytes)) { + final RecordReader reader; + + try { + reader = readerFactory.createRecordReader(attributes, in, logger); + } catch (final Exception e) { + handleParseFailure(consumerRecord, session, e); + continue; + } + + Record record; + while ((record = reader.nextRecord()) != null) { + // Determine the bundle for this record. + final RecordSchema recordSchema = record.getSchema(); + final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes); + + BundleTracker tracker = bundleMap.get(bundleInfo); + if (tracker == null) { + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + + final OutputStream rawOut = session.write(flowFile); + + final RecordSchema writeSchema; + try { + writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); + } catch (final Exception e) { + logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); + + try { + rollback(topicPartition); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + } + + yield(); + throw new ProcessException(e); + } + + writer = writerFactory.createWriter(logger, writeSchema, rawOut); + writer.beginRecordSet(); + + tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); + tracker.updateFlowFile(flowFile); + bundleMap.put(bundleInfo, tracker); + } else { + writer = tracker.recordWriter; + } + + try { + writer.write(record); + } catch (final RuntimeException re) { + handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. " + + "Will route message as its own FlowFile to the 'parse.failure' relationship"); + continue; + } + + tracker.incrementRecordCount(1L); + session.adjustCounter("Records Received", records.size(), false); + } + } + } + } catch (final Exception e) { + logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e); + + try { + if (writer != null) { + writer.close(); + } + } catch (final Exception ioe) { + logger.warn("Failed to close Record Writer", ioe); + } + + try { + rollback(topicPartition); + } catch (final Exception rollbackException) { + logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException); + } + + throw new ProcessException(e); + } + } + + + private void rollback(final TopicPartition topicPartition) { + OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition); + if (offsetAndMetadata == null) { + offsetAndMetadata = kafkaConsumer.committed(topicPartition); + } + + final long offset = offsetAndMetadata.offset(); + kafkaConsumer.seek(topicPartition, offset); + } + + + + private void populateAttributes(final BundleTracker tracker) { + final Map<String, String> kafkaAttrs = new HashMap<>(); + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset)); + if (tracker.key != null && tracker.totalRecords == 1) { + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key); + } + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition)); + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic); + if (tracker.totalRecords > 1) { + // Add a record.count attribute to remain consistent with other record-oriented processors. If not + // reading/writing records, then use "kafka.count" attribute. + if (tracker.recordWriter == null) { + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords)); + } else { + kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords)); + } + } + final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs); + final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos); + final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic); + getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis); + tracker.updateFlowFile(newFlowFile); + } + + private static class BundleTracker { + + final long initialOffset; + final int partition; + final String topic; + final String key; + final RecordSetWriter recordWriter; + FlowFile flowFile; + long totalRecords = 0; + + private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) { + this(initialRecord, topicPartition, keyEncoding, null); + } + + private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) { + this.initialOffset = initialRecord.offset(); + this.partition = topicPartition.partition(); + this.topic = topicPartition.topic(); + this.recordWriter = recordWriter; + this.key = encodeKafkaKey(initialRecord.key(), keyEncoding); + } + + private void incrementRecordCount(final long count) { + totalRecords += count; + } + + private void updateFlowFile(final FlowFile flowFile) { + this.flowFile = flowFile; + } + + } + + private static class BundleInformation { + private final TopicPartition topicPartition; + private final RecordSchema schema; + private final Map<String, String> attributes; + + public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes) { + this.topicPartition = topicPartition; + this.schema = schema; + this.attributes = attributes; + } + + @Override + public int hashCode() { + return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 * attributes.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof BundleInformation)) { + return false; + } + + final BundleInformation other = (BundleInformation) obj; + return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes); + } + } +}
