Repository: nifi Updated Branches: refs/heads/0.x 40618364e -> 99874055b
NIFI-2801 Fixed checkstyle line length issue in PutKafka. (+1 squashed commit) Squashed commits: [6e8f160] NIFI-2801 Edited Kafka processor documentation to explicitly state which Kafka versions supported by each processor. This closes #1124. Signed-off-by: Andy LoPresto <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/99874055 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/99874055 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/99874055 Branch: refs/heads/0.x Commit: 99874055bd649895dd4ddc68e294998929cbb001 Parents: 4061836 Author: Andrew Lim <[email protected]> Authored: Tue Oct 11 11:30:24 2016 -0400 Committer: Andy LoPresto <[email protected]> Committed: Wed Oct 12 19:25:57 2016 -0700 ---------------------------------------------------------------------- nifi-docs/src/main/asciidoc/getting-started.adoc | 8 ++++---- .../java/org/apache/nifi/processors/kafka/GetKafka.java | 6 +++--- .../java/org/apache/nifi/processors/kafka/PutKafka.java | 11 ++++++----- .../nifi/processors/kafka/pubsub/ConsumeKafka.java | 4 ++-- .../nifi/processors/kafka/pubsub/PublishKafka.java | 6 +++--- .../additionalDetails.html | 6 +++--- .../additionalDetails.html | 6 +++--- 7 files changed, 24 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/99874055/nifi-docs/src/main/asciidoc/getting-started.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/getting-started.adoc b/nifi-docs/src/main/asciidoc/getting-started.adoc index 486606f..247f261 100644 --- a/nifi-docs/src/main/asciidoc/getting-started.adoc +++ b/nifi-docs/src/main/asciidoc/getting-started.adoc @@ -138,7 +138,7 @@ background) and drop it there. This will give us a dialog that allows us to choo image:add-processor.png["Add Processor"] We have quite a few options to choose from. For the sake of becoming oriented with the system, let's say that we -just want to bring in files from our local disk into NiFi. When a developer creates a Processor, the developer can +just want to bring in files from our local disk into NiFi. When a developer creates a Processor, the developer can assign "tags" to that Processor. These can be thought of as keywords. We can filter by these tags or by Processor name by typing into the Filter box in the top-right corner of the dialog. Type in the keywords that you would think of when wanting to ingest files from a local disk. Typing in keyword "file", for instance, will provide us a few @@ -221,7 +221,7 @@ the prioritizer. If multiple prioritizers are activated, they will be evaluated will be evaluated first and if two FlowFiles are determined to be equal according to that Prioritizers, the second Prioritizer will be used. -For the sake of this discussion, we can simply click `Add`. to add the Connection to our graph. We should now see that the Alert +For the sake of this discussion, we can simply click `Add`. to add the Connection to our graph. We should now see that the Alert icon has changed to a Stopped icon ( image:iconStop.png[Stopped] ). The LogAttribute Processor, however, is now invalid because its `success` Relationship has not been connected to @@ -367,7 +367,7 @@ categorizing them by their functions. the content fetched from HDFS. - *FetchS3Object*: Fetches the contents of an object from the Amazon Web Services (AWS) Simple Storage Service (S3). The outbound FlowFile contains the contents received from S3. -- *GetKafka*: Consumes messages from Apache Kafka. The messages can be emitted as a FlowFile per message or can be batched together using a user-specified +- *GetKafka*: Fetches messages from Apache Kafka, specifically for 0.8.x versions. The messages can be emitted as a FlowFile per message or can be batched together using a user-specified delimiter. - *GetMongo*: Executes a user-specified query against MongoDB and writes the contents to a new FlowFile. - *GetTwitter*: Allows Users to register a filter to listen to the Twitter "garden hose" or Enterprise endpoint, creating a FlowFile for each tweet @@ -382,7 +382,7 @@ categorizing them by their functions. - *PutSQL*: Executes the contents of a FlowFile as a SQL DDL Statement (INSERT, UPDATE, or DELETE). The contents of the FlowFile must be a valid SQL statement. Attributes can be used as parameters so that the contents of the FlowFile can be parameterized SQL statements in order to avoid SQL injection attacks. -- *PutKafka*: Sends the contents of a FlowFile to Kafka as a message. The FlowFile can be sent as a single message or a delimiter, such as a +- *PutKafka*: Sends the contents of a FlowFile as a message to Apache Kafka, specifically for 0.8.x versions. The FlowFile can be sent as a single message or a delimiter, such as a new-line can be specified, in order to send many messages for a single FlowFile. - *PutMongo*: Sends the contents of a FlowFile to Mongo as an INSERT or an UPDATE. http://git-wip-us.apache.org/repos/asf/nifi/blob/99874055/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 7660305..84d6b89 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -69,8 +69,8 @@ import kafka.message.MessageAndMetadata; @SupportsBatching @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Fetches messages from Apache Kafka") -@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) +@CapabilityDescription("Fetches messages from Apache Kafka, specifically for 0.8.x versions. The complementary NiFi processor for sending messages is PutKafka.") +@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub", "0.8.x"}) @WritesAttributes({ @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If" @@ -80,7 +80,7 @@ import kafka.message.MessageAndMetadata; @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" - + " overriden with warning message describing the override." + + " overridden with warning message describing the override." + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") public class GetKafka extends AbstractProcessor { http://git-wip-us.apache.org/repos/asf/nifi/blob/99874055/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 12a9b89..abdf73d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -53,9 +53,10 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) -@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " - + "user-specified delimiter, such as a new-line.") +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.8.x"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka, , specifically for 0.8.x versions. " + + "The messages to send may be individual FlowFiles or may be delimited, using a " + + "user-specified delimiter, such as a new-line. The complementary NiFi processor for fetching messages is GetKafka.") @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" @@ -274,9 +275,9 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> { * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile} * producing a result {@link FlowFile}. * <br> - * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS} + * The result {@link FlowFile} that is successful is then transferred to {@link #REL_SUCCESS} * <br> - * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE} + * The result {@link FlowFile} that is failed is then transferred to {@link #REL_FAILURE} * */ @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/99874055/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java index ac5b4c5..0b0a478 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -55,8 +55,8 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Consumes messages from Apache Kafka") -@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" }) +@CapabilityDescription("Consumes messages from Apache Kafka,specifically built against the Kafka 0.9.x Consumer API. The complementary NiFi processor for sending messages is PublishKafka.") +@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"}) public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]>> { static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset"); http://git-wip-us.apache.org/repos/asf/nifi/blob/99874055/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index 6703c04..f9b473f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -49,9 +49,9 @@ import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResu import org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner; @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) -@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " - + "user-specified delimiter, such as a new-line.") +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub","0.9.x"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka, using the Kafka 0.9.x Producer. The messages to send may be individual FlowFiles or may be delimited, using a " + + "user-specified delimiter, such as a new-line. The complementary NiFi processor for fetching messages is ConsumeKafka.") @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." http://git-wip-us.apache.org/repos/asf/nifi/blob/99874055/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html index 0e09f72..3ea37e2 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html @@ -24,9 +24,9 @@ <!-- Processor Documentation ================================================== --> <h2>Description:</h2> <p> - This Processors polls <a href="http://kafka.apache.org/">Apache Kafka</a> - for data using KafkaConsumer API available with Kafka 0.9+. When a message is received - from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value + This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a> + for data using KafkaConsumer API available with Kafka 0.9.x. When a message is received + from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value of the Kafka message. </p> </body> http://git-wip-us.apache.org/repos/asf/nifi/blob/99874055/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html index 20ce03c..148347a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html @@ -24,9 +24,9 @@ <!-- Processor Documentation ================================================== --> <h2>Description:</h2> <p> - This Processors puts the contents of a FlowFile to a Topic in + This Processor puts the contents of a FlowFile to a Topic in <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available - with Kafka 0.9+ API. The content of a FlowFile becomes the contents of a Kafka message. + with Kafka 0.9.x API. The content of a FlowFile becomes the contents of a Kafka message. This message is optionally assigned a key by using the <Kafka Key> Property. </p> @@ -38,7 +38,7 @@ If the property is not set, the entire contents of the FlowFile will be sent as a single message. When using the demarcator, if some messages are successfully sent but other messages fail to send, the resulting FlowFile will be - considered a failed FlowFuile and will have additional attributes to that effect. + considered a failed FlowFile and will have additional attributes to that effect. One of such attributes is 'failed.last.idx' which indicates the index of the last message that was successfully ACKed by Kafka. (if no demarcator is used the value of this index will be -1). This will allow PublishKafka to only re-send un-ACKed messages on the next re-try.
