http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..3aa101a --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,72 @@ +nifi-kafka-nar +Copyright 2014-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2014 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Kafka + The following NOTICE information applies: + Apache Kafka + Copyright 2012 The Apache Software Foundation. + + (ASLv2) Yammer Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2012 Coda Hale and Yammer, Inc. + + This product includes software developed by Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom), which was released + with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ + + (ASLv2) Snappy Java + The following NOTICE information applies: + This product includes software developed by Google + Snappy: http://code.google.com/p/snappy/ (New BSD License) + + This product includes software developed by Apache + PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/ + (Apache 2.0 license) + + This library containd statically linked libstdc++. This inclusion is allowed by + "GCC RUntime Library Exception" + http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html + + (ASLv2) Apache ZooKeeper + The following NOTICE information applies: + Apache ZooKeeper + Copyright 2009-2012 The Apache Software Foundation + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) JavaMail API (compat) (javax.mail:mail:jar:1.4.7 - http://kenai.com/projects/javamail/mail) + +************************ +Common Development and Distribution License 1.0 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details. + + (CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp)
http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml new file mode 100644 index 0000000..17b5d2b --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml @@ -0,0 +1,79 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kafka-bundle</artifactId> + <version>0.7.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>nifi-kafka-pubsub-processors</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>0.9.0.1</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>0.9.0.1</version> + <exclusions> + <!-- Transitive dependencies excluded because they are located + in a legacy Maven repository, which Maven 3 doesn't support. --> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java new file mode 100644 index 0000000..8bae304 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for implementing {@link Processor}s to publish and consume + * messages to/from Kafka + * + * @see PublishKafka + * @see ConsumeKafka + */ +abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessionFactoryProcessor { + + final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; + + private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; + + + static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT"); + static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL"); + static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT"); + static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL"); + + static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder() + .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + .displayName("Kafka Brokers") + .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .expressionLanguageSupported(true) + .defaultValue("localhost:9092") + .build(); + static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder() + .name(ProducerConfig.CLIENT_ID_CONFIG) + .displayName("Client ID") + .description("String value uniquely identifying this client application. Corresponds to Kafka's 'client.id' property.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() + .name("security.protocol") + .displayName("Security Protocol") + .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") + .required(false) + .expressionLanguageSupported(false) + .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) + .defaultValue(SEC_PLAINTEXT.getValue()) + .build(); + static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder() + .name("sasl.kerberos.service.name") + .displayName("Kerberos Service Name") + .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. " + + "Corresponds to Kafka's 'security.protocol' property." + + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("topic") + .displayName("Topic Name") + .description("The name of the Kafka Topic") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder() + .name("message-demarcator") + .displayName("Message Demarcator") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are the are successfully sent to or received from Kafka are routed to this relationship") + .build(); + + static final List<PropertyDescriptor> SHARED_DESCRIPTORS = new ArrayList<>(); + + static final Set<Relationship> SHARED_RELATIONSHIPS = new HashSet<>(); + + private final AtomicInteger taskCounter = new AtomicInteger(); + + private volatile boolean acceptTask = true; + + static { + SHARED_DESCRIPTORS.add(BOOTSTRAP_SERVERS); + SHARED_DESCRIPTORS.add(TOPIC); + SHARED_DESCRIPTORS.add(CLIENT_ID); + SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL); + SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE); + SHARED_RELATIONSHIPS.add(REL_SUCCESS); + } + + /** + * Instance of {@link KafkaPublisher} or {@link KafkaConsumer} + */ + volatile T kafkaResource; + + /** + * This thread-safe operation will delegate to + * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first + * checking and creating (if necessary) Kafka resource which could be either + * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and + * destroy the underlying Kafka resource upon catching an {@link Exception} + * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}. + * After Kafka resource is destroyed it will be re-created upon the next + * invocation of this operation essentially providing a self healing mechanism + * to deal with potentially corrupted resource. + * <p> + * Keep in mind that upon catching an exception the state of this processor + * will be set to no longer accept any more tasks, until Kafka resource is reset. + * This means that in a multi-threaded situation currently executing tasks will + * be given a chance to complete while no new tasks will be accepted. + */ + @Override + public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaResource' can be reset before new tasks are accepted. + this.taskCounter.incrementAndGet(); + final ProcessSession session = sessionFactory.createSession(); + try { + /* + * We can't be doing double null check here since as a pattern + * it only works for lazy init but not reset, which is what we + * are doing here. In fact the first null check is dangerous + * since 'kafkaResource' can become null right after its null + * check passed causing subsequent NPE. + */ + synchronized (this) { + if (this.kafkaResource == null) { + this.kafkaResource = this.buildKafkaResource(context, session); + } + } + + /* + * The 'processed' boolean flag does not imply any failure or success. It simply states that: + * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated + * - PublishKafka - some messages were sent to Kafka based on existence of the input FlowFile + */ + boolean processed = this.rendezvousWithKafka(context, session); + session.commit(); + if (processed) { + this.postCommit(context); + } else { + context.yield(); + } + } catch (Throwable e) { + this.acceptTask = false; + session.rollback(true); + this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[] { this, e }); + } finally { + synchronized (this) { + if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) { + this.close(); + this.acceptTask = true; + } + } + } + } else { + this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); + this.getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); + context.yield(); + } + } + + /** + * Will call {@link Closeable#close()} on the target resource after which + * the target resource will be set to null. Should only be called when there + * are no more threads being executed on this processor or when it has been + * verified that only a single thread remains. + * + * @see KafkaPublisher + * @see KafkaConsumer + */ + @OnStopped + public void close() { + try { + if (this.kafkaResource != null) { + try { + this.kafkaResource.close(); + } catch (Exception e) { + this.getLogger().warn("Failed while closing " + this.kafkaResource, e); + } + } + } finally { + this.kafkaResource = null; + } + } + + /** + * + */ + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) + .build(); + } + + /** + * This operation is called from + * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method and + * contains main processing logic for this Processor. + */ + protected abstract boolean rendezvousWithKafka(ProcessContext context, ProcessSession session); + + /** + * Builds target resource for interacting with Kafka. The target resource + * could be one of {@link KafkaPublisher} or {@link KafkaConsumer} + */ + protected abstract T buildKafkaResource(ProcessContext context, ProcessSession session); + + /** + * This operation will be executed after {@link ProcessSession#commit()} has + * been called. + */ + protected void postCommit(ProcessContext context) { + // no op + } + + /** + * + */ + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + List<ValidationResult> results = new ArrayList<>(); + + String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue(); + + /* + * validates that if one of SASL (Kerberos) option is selected for + * security protocol, then Kerberos principal is provided as well + */ + if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)){ + String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); + if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0){ + results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) + .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" + + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" + + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") + .build()); + } + } + + return results; + } + + /** + * Builds transit URI for provenance event. The transit URI will be in the + * form of <security.protocol>://<bootstrap.servers>/topic + */ + String buildTransitURI(String securityProtocol, String brokers, String topic) { + StringBuilder builder = new StringBuilder(); + builder.append(securityProtocol); + builder.append("://"); + builder.append(brokers); + builder.append("/"); + builder.append(topic); + return builder.toString(); + } + + /** + * Builds Kafka {@link Properties} + */ + Properties buildKafkaProperties(ProcessContext context) { + Properties properties = new Properties(); + for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) { + String pName = propertyDescriptor.getName(); + String pValue = propertyDescriptor.isExpressionLanguageSupported() + ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue() + : context.getProperty(propertyDescriptor).getValue(); + if (pValue != null) { + if (pName.endsWith(".ms")) { // kafka standard time notation + pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS)); + } + properties.setProperty(pName, pValue); + } + } + return properties; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java new file mode 100644 index 0000000..5949bf0 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Consumes messages from Apache Kafka") +@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" }) +public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]>> { + + static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset"); + + static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset"); + + static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); + + static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() + .name(ConsumerConfig.GROUP_ID_CONFIG) + .displayName("Group ID") + .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() + .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) + .displayName("Offset Reset") + .description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any " + + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.") + .required(true) + .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE) + .defaultValue(OFFSET_LATEST.getValue()) + .build(); + static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER + .description("Since KafkaConsumer receives messages in batches, you have an option to output a single FlowFile which contains " + + "all Kafka messages in a single batch and this property allows you to provide a string (interpreted as UTF-8) to use " + + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received " + + "in a batch will result in a single FlowFile which essentially means that this processor may output multiple FlowFiles for each " + + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS") + .build(); + + + static final List<PropertyDescriptor> DESCRIPTORS; + + static final Set<Relationship> RELATIONSHIPS; + + private volatile byte[] demarcatorBytes; + + private volatile String topic; + + private volatile String brokers; + + /* + * Will ensure that the list of the PropertyDescriptors is build only once, + * since all other lifecycle methods are invoked multiple times. + */ + static { + List<PropertyDescriptor> _descriptors = new ArrayList<>(); + _descriptors.addAll(SHARED_DESCRIPTORS); + _descriptors.add(GROUP_ID); + _descriptors.add(AUTO_OFFSET_RESET); + _descriptors.add(MESSAGE_DEMARCATOR); + DESCRIPTORS = Collections.unmodifiableList(_descriptors); + + RELATIONSHIPS = Collections.unmodifiableSet(SHARED_RELATIONSHIPS); + } + + /** + * + */ + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + /** + * Will unsubscribe form {@link KafkaConsumer} delegating to 'super' to do + * the rest. + */ + @Override + @OnStopped + public void close() { + if (this.kafkaResource != null) { + try { + this.kafkaResource.unsubscribe(); + } finally { // in the event the above fails + super.close(); + } + } + } + + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * Will rendezvous with Kafka by performing the following: + * <br> + * - poll {@link ConsumerRecords} from {@link KafkaConsumer} in a + * non-blocking manner, signaling yield if no records were received from + * Kafka + * <br> + * - if records were received form Kafka, the are written to a newly created + * {@link FlowFile}'s {@link OutputStream} using a provided demarcator (see + * {@link #MESSAGE_DEMARCATOR} + */ + @Override + protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession processSession) { + ConsumerRecords<byte[], byte[]> consumedRecords = this.kafkaResource.poll(100); + if (consumedRecords != null && !consumedRecords.isEmpty()) { + long start = System.nanoTime(); + FlowFile flowFile = processSession.create(); + final AtomicInteger messageCounter = new AtomicInteger(); + + final Iterator<ConsumerRecord<byte[], byte[]>> iter = consumedRecords.iterator(); + while (iter.hasNext()){ + flowFile = processSession.append(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + ConsumerRecord<byte[], byte[]> consumedRecord = iter.next(); + if (messageCounter.getAndIncrement() > 0 && ConsumeKafka.this.demarcatorBytes != null) { + out.write(ConsumeKafka.this.demarcatorBytes); + } + out.write(consumedRecord.value()); + } + }); + /* + * Release FlowFile if there are more messages in the + * ConsumerRecords batch and no demarcator was provided, + * otherwise the FlowFile will be released as soon as this loop + * exits. + */ + if (iter.hasNext() && ConsumeKafka.this.demarcatorBytes == null){ + this.releaseFlowFile(flowFile, context, processSession, start, messageCounter.get()); + flowFile = processSession.create(); + messageCounter.set(0); + } + } + this.releaseFlowFile(flowFile, context, processSession, start, messageCounter.get()); + } + return consumedRecords != null && !consumedRecords.isEmpty(); + } + + /** + * This operation is called from + * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method after + * the process session is committed so that then kafka offset changes can be + * committed. This can mean in cases of really bad timing we could have data + * duplication upon recovery but not data loss. We want to commit the flow + * files in a NiFi sense before we commit them in a Kafka sense. + */ + @Override + protected void postCommit(ProcessContext context) { + this.kafkaResource.commitSync(); + } + + /** + * Builds and instance of {@link KafkaConsumer} and subscribes to a provided + * topic. + */ + @Override + protected Consumer<byte[], byte[]> buildKafkaResource(ProcessContext context, ProcessSession session) { + this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() + ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) + : null; + this.topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue(); + this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + + Properties kafkaProperties = this.buildKafkaProperties(context); + kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + + KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(kafkaProperties); + consumer.subscribe(Collections.singletonList(this.topic)); + return consumer; + } + + /** + * Will release flow file. Releasing of the flow file in the context of this + * operation implies the following: + * + * If Empty then remove from session and return If has something then + * transfer to {@link #REL_SUCCESS} + */ + private void releaseFlowFile(FlowFile flowFile, ProcessContext context, ProcessSession session, long start, int msgCount) { + long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + String transitUri = this.buildTransitURI(context.getProperty(SECURITY_PROTOCOL).getValue(), this.brokers, topic); + session.getProvenanceReporter().receive(flowFile, transitUri, "Received " + msgCount + " Kafka messages", executionDuration); + this.getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] { flowFile, msgCount, executionDuration }); + session.transfer(flowFile, REL_SUCCESS); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java new file mode 100644 index 0000000..f42a892 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.Closeable; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.stream.io.util.StreamDemarcator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor + * with sending contents of the {@link FlowFile}s to Kafka. + */ +class KafkaPublisher implements Closeable { + + private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); + + private final Producer<byte[], byte[]> kafkaProducer; + + private volatile long ackWaitTime = 30000; + + private volatile ProcessorLog processLog; + + /** + * Creates an instance of this class as well as the instance of the + * corresponding Kafka {@link KafkaProducer} using provided Kafka + * configuration properties. + * + * @param kafkaProperties + * instance of {@link Properties} used to bootstrap + * {@link KafkaProducer} + */ + KafkaPublisher(Properties kafkaProperties) { + this.kafkaProducer = new KafkaProducer<>(kafkaProperties); + } + + /** + * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to + * determine how many messages to Kafka will be sent from a provided + * {@link InputStream} (see {@link PublishingContext#getContentStream()}). + * It supports two publishing modes: + * <ul> + * <li>Sending all messages constructed from + * {@link StreamDemarcator#nextToken()} operation.</li> + * <li>Sending only unacknowledged messages constructed from + * {@link StreamDemarcator#nextToken()} operation.</li> + * </ul> + * The unacknowledged messages are determined from the value of + * {@link PublishingContext#getLastAckedMessageIndex()}. + * <br> + * This method assumes content stream affinity where it is expected that the + * content stream that represents the same Kafka message(s) will remain the + * same across possible retries. This is required specifically for cases + * where delimiter is used and a single content stream may represent + * multiple Kafka messages. The + * {@link PublishingContext#getLastAckedMessageIndex()} will provide the + * index of the last ACKed message, so upon retry only messages with the + * higher index are sent. + * + * @param publishingContext + * instance of {@link PublishingContext} which hold context + * information about the message(s) to be sent. + * @return The index of the last successful offset. + */ + KafkaPublisherResult publish(PublishingContext publishingContext) { + StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(), + publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize()); + + int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex(); + List<Future<RecordMetadata>> resultFutures = new ArrayList<>(); + + byte[] messageBytes; + int tokenCounter = 0; + for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { + if (prevLastAckedMessageIndex < tokenCounter) { + ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes); + resultFutures.add(this.kafkaProducer.send(message)); + } + } + + int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); + return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); + } + + /** + * Sets the time this publisher will wait for the {@link Future#get()} + * operation (the Future returned by + * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing + * out. + * + * This value will also be used as a timeout when closing the underlying + * {@link KafkaProducer}. See {@link #close()}. + */ + void setAckWaitTime(long ackWaitTime) { + this.ackWaitTime = ackWaitTime; + } + + /** + * This operation will process ACKs from Kafka in the order in which + * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning + * the index of the last ACKed message. Within this operation processing ACK + * simply means successful invocation of 'get()' operation on the + * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)} + * operation. Upon encountering any type of error while interrogating such + * {@link Future} the ACK loop will end. Messages that were not ACKed would + * be considered non-delivered and therefore could be resent at the later + * time. + * + * @param sendFutures + * list of {@link Future}s representing results of publishing to + * Kafka + * + * @param lastAckMessageIndex + * the index of the last ACKed message. It is important to + * provide the last ACKed message especially while re-trying so + * the proper index is maintained. + */ + private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) { + boolean exceptionThrown = false; + for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) { + Future<RecordMetadata> future = sendFutures.get(segmentCounter); + try { + future.get(this.ackWaitTime, TimeUnit.MILLISECONDS); + lastAckMessageIndex++; + } catch (InterruptedException e) { + exceptionThrown = true; + Thread.currentThread().interrupt(); + this.warnOrError("Interrupted while waiting for acks from Kafka", null); + } catch (ExecutionException e) { + exceptionThrown = true; + this.warnOrError("Failed while waiting for acks from Kafka", e); + } catch (TimeoutException e) { + exceptionThrown = true; + this.warnOrError("Timed out while waiting for acks from Kafka", null); + } + } + + return lastAckMessageIndex; + } + + /** + * Will close the underlying {@link KafkaProducer} waiting if necessary for + * the same duration as supplied {@link #setAckWaitTime(long)} + */ + @Override + public void close() { + this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS); + } + + /** + * Will set {@link ProcessorLog} as an additional logger to forward log + * messages to NiFi bulletin + */ + void setProcessLog(ProcessorLog processLog) { + this.processLog = processLog; + } + + /** + * + */ + private void warnOrError(String message, Exception e) { + if (e == null) { + logger.warn(message); + if (this.processLog != null) { + this.processLog.warn(message); + } + } else { + logger.error(message, e); + if (this.processLog != null) { + this.processLog.error(message, e); + } + } + } + + /** + * Encapsulates the result received from publishing messages to Kafka + */ + static class KafkaPublisherResult { + private final int messagesSent; + private final int lastMessageAcked; + KafkaPublisherResult(int messagesSent, int lastMessageAcked) { + this.messagesSent = messagesSent; + this.lastMessageAcked = lastMessageAcked; + } + + public int getMessagesSent() { + return this.messagesSent; + } + + public int getLastMessageAcked() { + return this.lastMessageAcked; + } + + public boolean isAllAcked() { + return this.messagesSent - 1 == this.lastMessageAcked; + } + + @Override + public String toString() { + return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java new file mode 100644 index 0000000..8c948df --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.util.Map; + +import org.apache.kafka.clients.producer.Partitioner; +import org.apache.kafka.common.Cluster; + +/** + * Collection of implementation of common Kafka {@link Partitioner}s. + */ +final public class Partitioners { + + private Partitioners() { + } + + /** + * {@link Partitioner} that implements 'round-robin' mechanism which evenly + * distributes load between all available partitions. + */ + public static class RoundRobinPartitioner implements Partitioner { + private volatile int index; + + @Override + public void configure(Map<String, ?> configs) { + // noop + } + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + return this.next(cluster.availablePartitionsForTopic(topic).size()); + } + + @Override + public void close() { + // noop + } + + private synchronized int next(int numberOfPartitions) { + if (this.index >= numberOfPartitions) { + this.index = 0; + } + return index++; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java new file mode 100644 index 0000000..6235f0b --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult; +import org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner; + +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " + + "user-specified delimiter, such as a new-line.") +@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") +public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> { + + protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id"; + + protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx"; + + protected static final String FAILED_TOPIC_ATTR = "failed.topic"; + + protected static final String FAILED_KEY_ATTR = "failed.key"; + + protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter"; + + protected static final String MSG_COUNT = "msg.count"; + + static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", + "FlowFile will be routed to failure unless the message is replicated to the appropriate " + + "number of Kafka Nodes according to the Topic configuration"); + static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", + "FlowFile will be routed to success if the message is received by a single Kafka node, " + + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> " + + "but can result in data loss if a Kafka node crashes"); + static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", + "FlowFile will be routed to success after successfully writing the content to a Kafka node, " + + "without waiting for a response. This provides the best performance but may result in data loss."); + + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(RoundRobinPartitioner.class.getName(), + RoundRobinPartitioner.class.getSimpleName(), + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + + "the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", + "DefaultPartitioner", "Messages will be assigned to random partitions."); + + static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name(ProducerConfig.ACKS_CONFIG) + .displayName("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder() + .name(ProducerConfig.MAX_BLOCK_MS_CONFIG) + .displayName("Meta Data Wait Time") + .description("The amount of time KafkaConsumer will wait to obtain metadata during the 'send' call before failing the " + + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("30 sec") + .build(); + static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("kafka-key") + .displayName("Kafka Key") + .description("The Key to use for the Message") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER + .description("Specifies the string (interpreted as UTF-8) to use for demarcating apart multiple messages within " + + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " + + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " + + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on your OS.") + .build(); + static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() + .name(ProducerConfig.PARTITIONER_CLASS_CONFIG) + .displayName("Partitioner class") + .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING) + .defaultValue(RANDOM_PARTITIONING.getValue()) + .required(false) + .build(); + static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name(ProducerConfig.COMPRESSION_TYPE_CONFIG) + .displayName("Compression Type") + .description("This parameter allows you to specify the compression codec for all data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues("none", "gzip", "snappy", "lz4") + .defaultValue("none") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + + static final List<PropertyDescriptor> DESCRIPTORS; + + static final Set<Relationship> RELATIONSHIPS; + + private volatile String brokers; + + /* + * Will ensure that list of PropertyDescriptors is build only once, since + * all other lifecycle methods are invoked multiple times. + */ + static { + List<PropertyDescriptor> _descriptors = new ArrayList<>(); + _descriptors.addAll(SHARED_DESCRIPTORS); + _descriptors.add(DELIVERY_GUARANTEE); + _descriptors.add(KEY); + _descriptors.add(MESSAGE_DEMARCATOR); + _descriptors.add(META_WAIT_TIME); + _descriptors.add(PARTITION_CLASS); + _descriptors.add(COMPRESSION_CODEC); + + DESCRIPTORS = Collections.unmodifiableList(_descriptors); + + Set<Relationship> _relationships = new HashSet<>(); + _relationships.addAll(SHARED_RELATIONSHIPS); + _relationships.add(REL_FAILURE); + RELATIONSHIPS = Collections.unmodifiableSet(_relationships); + } + + /** + * + */ + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + /** + * + */ + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile} + * producing a result {@link FlowFile}. + * <br> + * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS} + * <br> + * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE} + * + */ + @Override + protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session){ + FlowFile flowFile = session.get(); + if (flowFile != null) { + long start = System.nanoTime(); + flowFile = this.doRendezvousWithKafka(flowFile, context, session); + Relationship relationship = REL_SUCCESS; + if (!this.isFailedFlowFile(flowFile)) { + String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + String transitUri = this.buildTransitURI(context.getProperty(SECURITY_PROTOCOL).getValue(), this.brokers, topic); + session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", executionDuration); + this.getLogger().info("Successfully sent {} to Kafka as {} message(s) in {} millis", new Object[] { flowFile, flowFile.getAttribute(MSG_COUNT), executionDuration }); + } else { + relationship = REL_FAILURE; + flowFile = session.penalize(flowFile); + } + session.transfer(flowFile, relationship); + } + return flowFile != null; + } + + /** + * Builds and instance of {@link KafkaPublisher}. + */ + @Override + protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) { + Properties kafkaProperties = this.buildKafkaProperties(context); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); + return new KafkaPublisher(kafkaProperties); + } + + /** + * Will rendezvous with {@link KafkaPublisher} after building + * {@link PublishingContext} and will produce the resulting {@link FlowFile}. + * The resulting FlowFile contains all required information to determine + * if message publishing originated from the provided FlowFile has actually + * succeeded fully, partially or failed completely (see + * {@link #isFailedFlowFile(FlowFile)}. + */ + private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) { + final AtomicReference<KafkaPublisherResult> publishResultRef = new AtomicReference<>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream contentStream) throws IOException { + PublishingContext publishingContext = PublishKafka.this.buildPublishingContext(flowFile, context, contentStream); + KafkaPublisherResult result = PublishKafka.this.kafkaResource.publish(publishingContext); + publishResultRef.set(result); + } + }); + + FlowFile resultFile = publishResultRef.get().isAllAcked() + ? this.cleanUpFlowFileIfNecessary(flowFile, session) + : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context)); + + if (!this.isFailedFlowFile(resultFile)) { + resultFile = session.putAttribute(resultFile, MSG_COUNT, String.valueOf(publishResultRef.get().getMessagesSent())); + } + return resultFile; + } + + /** + * Builds {@link PublishingContext} for message(s) to be sent to Kafka. + * {@link PublishingContext} contains all contextual information required by + * {@link KafkaPublisher} to publish to Kafka. Such information contains + * things like topic name, content stream, delimiter, key and last ACKed + * message for cases where provided FlowFile is being retried (failed in the + * past). + * <br> + * For the clean FlowFile (file that has been sent for the first time), + * PublishingContext will be built form {@link ProcessContext} associated + * with this invocation. + * <br> + * For the failed FlowFile, {@link PublishingContext} will be built from + * attributes of that FlowFile which by then will already contain required + * information (e.g., topic, key, delimiter etc.). This is required to + * ensure the affinity of the retry in the even where processor + * configuration has changed. However keep in mind that failed FlowFile is + * only considered a failed FlowFile if it is being re-processed by the same + * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see + * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to + * another PublishKafka processor it is treated as a fresh FlowFile + * regardless if it has #FAILED* attributes set. + */ + private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) { + String topicName; + byte[] keyBytes; + byte[] delimiterBytes = null; + int lastAckedMessageIndex = -1; + if (this.isFailedFlowFile(flowFile)) { + lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); + topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); + keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null + ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null; + delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null + ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; + + } else { + topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); + delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) + .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; + } + + PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); + publishingContext.setKeyBytes(keyBytes); + publishingContext.setDelimiterBytes(delimiterBytes); + return publishingContext; + } + + /** + * Will remove FAILED_* attributes if FlowFile is no longer considered a + * failed FlowFile + * + * @see #isFailedFlowFile(FlowFile) + */ + private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { + if (this.isFailedFlowFile(flowFile)) { + Set<String> keysToRemove = new HashSet<>(); + keysToRemove.add(FAILED_DELIMITER_ATTR); + keysToRemove.add(FAILED_KEY_ATTR); + keysToRemove.add(FAILED_TOPIC_ATTR); + keysToRemove.add(FAILED_PROC_ID_ATTR); + keysToRemove.add(FAILED_LAST_ACK_IDX); + flowFile = session.removeAllAttributes(flowFile, keysToRemove); + } + return flowFile; + } + + /** + * Builds a {@link Map} of FAILED_* attributes + * + * @see #FAILED_PROC_ID_ATTR + * @see #FAILED_LAST_ACK_IDX + * @see #FAILED_TOPIC_ATTR + * @see #FAILED_KEY_ATTR + * @see #FAILED_DELIMITER_ATTR + */ + private Map<String, String> buildFailedFlowFileAttributes(int lastAckedMessageIndex, FlowFile sourceFlowFile, ProcessContext context) { + Map<String, String> attributes = new HashMap<>(); + attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier()); + attributes.put(FAILED_LAST_ACK_IDX, String.valueOf(lastAckedMessageIndex)); + attributes.put(FAILED_TOPIC_ATTR, context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue()); + attributes.put(FAILED_KEY_ATTR, context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue()); + attributes.put(FAILED_DELIMITER_ATTR, context.getProperty(MESSAGE_DEMARCATOR).isSet() + ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(sourceFlowFile).getValue() : null); + return attributes; + } + + /** + * Returns 'true' if provided FlowFile is a failed FlowFile. A failed + * FlowFile contains {@link #FAILED_PROC_ID_ATTR}. + */ + private boolean isFailedFlowFile(FlowFile flowFile) { + return this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java new file mode 100644 index 0000000..bda29e6 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +/** + * Holder of context information used by {@link KafkaPublisher} required to + * publish messages to Kafka. + */ +class PublishingContext { + + private final InputStream contentStream; + + private final String topic; + + private final int lastAckedMessageIndex; + + /* + * We're using the default value from Kafka. We are using it to control the + * message size before it goes to to Kafka thus limiting possibility of a + * late failures in Kafka client. + */ + private int maxRequestSize = 1048576; // kafka default + + private boolean maxRequestSizeSet; + + private byte[] keyBytes; + + private byte[] delimiterBytes; + + PublishingContext(InputStream contentStream, String topic) { + this(contentStream, topic, -1); + } + + PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { + this.validateInput(contentStream, topic, lastAckedMessageIndex); + this.contentStream = contentStream; + this.topic = topic; + this.lastAckedMessageIndex = lastAckedMessageIndex; + } + + @Override + public String toString() { + return "topic: '" + this.topic + "'; delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'"; + } + + int getLastAckedMessageIndex() { + return this.lastAckedMessageIndex; + } + + int getMaxRequestSize() { + return this.maxRequestSize; + } + + byte[] getKeyBytes() { + return this.keyBytes; + } + + byte[] getDelimiterBytes() { + return this.delimiterBytes; + } + + InputStream getContentStream() { + return this.contentStream; + } + + String getTopic() { + return this.topic; + } + + void setKeyBytes(byte[] keyBytes) { + if (this.keyBytes == null) { + if (keyBytes != null) { + this.assertBytesValid(keyBytes); + this.keyBytes = keyBytes; + } + } else { + throw new IllegalArgumentException("'keyBytes' can only be set once per instance"); + } + } + + void setDelimiterBytes(byte[] delimiterBytes) { + if (this.delimiterBytes == null) { + if (delimiterBytes != null) { + this.assertBytesValid(delimiterBytes); + this.delimiterBytes = delimiterBytes; + } + } else { + throw new IllegalArgumentException("'delimiterBytes' can only be set once per instance"); + } + } + + void setMaxRequestSize(int maxRequestSize) { + if (!this.maxRequestSizeSet) { + if (maxRequestSize > 0) { + this.maxRequestSize = maxRequestSize; + this.maxRequestSizeSet = true; + } else { + throw new IllegalArgumentException("'maxRequestSize' must be > 0"); + } + } else { + throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance"); + } + } + + private void assertBytesValid(byte[] bytes) { + if (bytes != null) { + if (bytes.length == 0) { + throw new IllegalArgumentException("'bytes' must not be empty"); + } + } + } + + private void validateInput(InputStream contentStream, String topic, int lastAckedMessageIndex) { + if (contentStream == null) { + throw new IllegalArgumentException("'contentStream' must not be null"); + } else if (topic == null || topic.trim().length() == 0) { + throw new IllegalArgumentException("'topic' must not be null or empty"); + } else if (lastAckedMessageIndex < -1) { + throw new IllegalArgumentException("'lastAckedMessageIndex' must be >= -1"); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..28b8393 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.kafka.pubsub.PublishKafka +org.apache.nifi.processors.kafka.pubsub.ConsumeKafka \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html new file mode 100644 index 0000000..0e09f72 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html @@ -0,0 +1,33 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>ConsumeKafka</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p> + This Processors polls <a href="http://kafka.apache.org/">Apache Kafka</a> + for data using KafkaConsumer API available with Kafka 0.9+. When a message is received + from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value + of the Kafka message. + </p> + </body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html new file mode 100644 index 0000000..20ce03c --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html @@ -0,0 +1,47 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>PublishKafka</title> + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <!-- Processor Documentation ================================================== --> + <h2>Description:</h2> + <p> + This Processors puts the contents of a FlowFile to a Topic in + <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available + with Kafka 0.9+ API. The content of a FlowFile becomes the contents of a Kafka message. + This message is optionally assigned a key by using the <Kafka Key> Property. + </p> + + <p> + The Processor allows the user to configure an optional Message Demarcator that + can be used to send many messages per FlowFile. For example, a <i>\n</i> could be used + to indicate that the contents of the FlowFile should be used to send one message + per line of text. It also supports multi-char demarcators (e.g., 'my custom demarcator'). + If the property is not set, the entire contents of the FlowFile + will be sent as a single message. When using the demarcator, if some messages are + successfully sent but other messages fail to send, the resulting FlowFile will be + considered a failed FlowFuile and will have additional attributes to that effect. + One of such attributes is 'failed.last.idx' which indicates the index of the last message + that was successfully ACKed by Kafka. (if no demarcator is used the value of this index will be -1). + This will allow PublishKafka to only re-send un-ACKed messages on the next re-try. + </p> + </body> +</html>
