pvillard31 commented on a change in pull request #4672:
URL: https://github.com/apache/nifi/pull/4672#discussion_r526136155
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
written to a FlowFile by serializing the message with the
configured Record Writer.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka
topics in such a way that the topics to consume from are randomly
+ assigned to to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
Review comment:
```suggestion
assigned to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
written to a FlowFile by serializing the message with the
configured Record Writer.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka
topics in such a way that the topics to consume from are randomly
+ assigned to to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
+ NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned
partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+ Node 3 will then be assigned partitions 6 and 7.
+ </p>
+
+ <p>
+ In this scenario, if Node 3 somehow fails or stops pulling data
from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+ For most use cases, this is desirable. It provides fault tolerance
and allows the remaining nodes to pick up the slack. However, there are cases
+ where this is undesirable.
+ </p>
+
+ <p>
+ One such case is when using NiFi to consume Change Data Capture
(CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+ has pulled 1,000 messages from Kafka but has not yet delivered
them to their final destination. NiFi is then stopped and restarted, and that
takes
+ 15 minutes to complete. In the meantime, Partitions 6 and 7 have
been reassigned to the other nodes. Those nodes then proceeded to pull data from
+ Kafka and deliver it to the desired destination. After 15 minutes,
Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+ it has already pulled from Kafka to the destination system. Now,
those records have been delivered out of order.
+ </p>
+
+ <p>
+ The solution for this, then, is to assign partitions statically
instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3
specifically.
+ Then, if Node 3 is restarted, the other nodes will not pull data
from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is
restarted. By
+ using this approach, we can ensure that the data that already was
pulled can be processed (assuming First In First Out Prioritizers are used)
before newer messages
+ are handled.
+ </p>
+
+ <p>
+ In order to provide a static mapping of node to Kafka
partition(s), one or more user-defined properties must be added using the
naming scheme
+ <code>partitions.<hostname></code> with the value being a
comma-separated list of Kafka partitions to use. For example,
+ <code>partitions.nifi-01=0, 3, 6, 9</code>,
<code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5,
8, 11</code>.
+ The hostname that is used can be the fully qualified hostname, the
"simple" hostname, or the IP address. There must be an entry for each node in
+ the cluster, or the Processor will become invalid. If it is
desirable for a node to not have any partitions assigned to it, a Property may
be
+ added for the hostname with an empty string as the value.
+ </p>
+
+ <p>
+ In order to use a static mapping of Kafka partitions, the "Topic
Name Format" must be set to "names" rather than "pattern."
Review comment:
```suggestion
In order to use a static mapping of Kafka partitions, the "Topic
Name Format" must be set to "names" rather than "pattern".
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
of the Kafka message.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka
topics in such a way that the topics to consume from are randomly
+ assigned to to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
+ NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned
partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+ Node 3 will then be assigned partitions 6 and 7.
+ </p>
+
+ <p>
+ In this scenario, if Node 3 somehow fails or stops pulling data
from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+ For most use cases, this is desirable. It provides fault tolerance
and allows the remaining nodes to pick up the slack. However, there are cases
+ where this is undesirable.
+ </p>
+
+ <p>
+ One such case is when using NiFi to consume Change Data Capture
(CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+ has pulled 1,000 messages from Kafka but has not yet delivered
them to their final destination. NiFi is then stopped and restarted, and that
takes
+ 15 minutes to complete. In the meantime, Partitions 6 and 7 have
been reassigned to the other nodes. Those nodes then proceeded to pull data from
+ Kafka and deliver it to the desired destination. After 15 minutes,
Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+ it has already pulled from Kafka to the destination system. Now,
those records have been delivered out of order.
+ </p>
+
+ <p>
+ The solution for this, then, is to assign partitions statically
instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3
specifically.
+ Then, if Node 3 is restarted, the other nodes will not pull data
from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is
restarted. By
+ using this approach, we can ensure that the data that already was
pulled can be processed (assuming First In First Out Prioritizers are used)
before newer messages
+ are handled.
+ </p>
+
+ <p>
+ In order to provide a static mapping of node to Kafka
partition(s), one or more user-defined properties must be added using the
naming scheme
+ <code>partitions.<hostname></code> with the value being a
comma-separated list of Kafka partitions to use. For example,
+ <code>partitions.nifi-01=0, 3, 6, 9</code>,
<code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5,
8, 11</code>.
+ The hostname that is used can be the fully qualified hostname, the
"simple" hostname, or the IP address. There must be an entry for each node in
+ the cluster, or the Processor will become invalid. If it is
desirable for a node to not have any partitions assigned to it, a Property may
be
+ added for the hostname with an empty string as the value.
+ </p>
+
+ <p>
+ In order to use a static mapping of Kafka partitions, the "Topic
Name Format" must be set to "names" rather than "pattern."
Review comment:
```suggestion
In order to use a static mapping of Kafka partitions, the "Topic
Name Format" must be set to "names" rather than "pattern".
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
of the Kafka message.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka
topics in such a way that the topics to consume from are randomly
+ assigned to to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
Review comment:
```suggestion
assigned to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
written to a FlowFile by serializing the message with the
configured Record Writer.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka
topics in such a way that the topics to consume from are randomly
+ assigned to to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
Review comment:
```suggestion
assigned to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
of the Kafka message.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka
topics in such a way that the topics to consume from are randomly
+ assigned to to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
Review comment:
```suggestion
assigned to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
written to a FlowFile by serializing the message with the
configured Record Writer.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka
topics in such a way that the topics to consume from are randomly
+ assigned to to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
+ NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned
partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+ Node 3 will then be assigned partitions 6 and 7.
+ </p>
+
+ <p>
+ In this scenario, if Node 3 somehow fails or stops pulling data
from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+ For most use cases, this is desirable. It provides fault tolerance
and allows the remaining nodes to pick up the slack. However, there are cases
+ where this is undesirable.
+ </p>
+
+ <p>
+ One such case is when using NiFi to consume Change Data Capture
(CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+ has pulled 1,000 messages from Kafka but has not yet delivered
them to their final destination. NiFi is then stopped and restarted, and that
takes
+ 15 minutes to complete. In the meantime, Partitions 6 and 7 have
been reassigned to the other nodes. Those nodes then proceeded to pull data from
+ Kafka and deliver it to the desired destination. After 15 minutes,
Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+ it has already pulled from Kafka to the destination system. Now,
those records have been delivered out of order.
+ </p>
+
+ <p>
+ The solution for this, then, is to assign partitions statically
instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3
specifically.
+ Then, if Node 3 is restarted, the other nodes will not pull data
from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is
restarted. By
+ using this approach, we can ensure that the data that already was
pulled can be processed (assuming First In First Out Prioritizers are used)
before newer messages
+ are handled.
+ </p>
+
+ <p>
+ In order to provide a static mapping of node to Kafka
partition(s), one or more user-defined properties must be added using the
naming scheme
+ <code>partitions.<hostname></code> with the value being a
comma-separated list of Kafka partitions to use. For example,
+ <code>partitions.nifi-01=0, 3, 6, 9</code>,
<code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5,
8, 11</code>.
+ The hostname that is used can be the fully qualified hostname, the
"simple" hostname, or the IP address. There must be an entry for each node in
+ the cluster, or the Processor will become invalid. If it is
desirable for a node to not have any partitions assigned to it, a Property may
be
+ added for the hostname with an empty string as the value.
+ </p>
+
+ <p>
+ In order to use a static mapping of Kafka partitions, the "Topic
Name Format" must be set to "names" rather than "pattern."
Review comment:
```suggestion
In order to use a static mapping of Kafka partitions, the "Topic
Name Format" must be set to "names" rather than "pattern".
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
of the Kafka message.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka
topics in such a way that the topics to consume from are randomly
+ assigned to to the nodes in the NiFi cluster. Consider a scenario
where a single Kafka topic has 8 partitions and the consuming
+ NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned
partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+ Node 3 will then be assigned partitions 6 and 7.
+ </p>
+
+ <p>
+ In this scenario, if Node 3 somehow fails or stops pulling data
from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+ For most use cases, this is desirable. It provides fault tolerance
and allows the remaining nodes to pick up the slack. However, there are cases
+ where this is undesirable.
+ </p>
+
+ <p>
+ One such case is when using NiFi to consume Change Data Capture
(CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+ has pulled 1,000 messages from Kafka but has not yet delivered
them to their final destination. NiFi is then stopped and restarted, and that
takes
+ 15 minutes to complete. In the meantime, Partitions 6 and 7 have
been reassigned to the other nodes. Those nodes then proceeded to pull data from
+ Kafka and deliver it to the desired destination. After 15 minutes,
Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+ it has already pulled from Kafka to the destination system. Now,
those records have been delivered out of order.
+ </p>
+
+ <p>
+ The solution for this, then, is to assign partitions statically
instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3
specifically.
+ Then, if Node 3 is restarted, the other nodes will not pull data
from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is
restarted. By
+ using this approach, we can ensure that the data that already was
pulled can be processed (assuming First In First Out Prioritizers are used)
before newer messages
+ are handled.
+ </p>
+
+ <p>
+ In order to provide a static mapping of node to Kafka
partition(s), one or more user-defined properties must be added using the
naming scheme
+ <code>partitions.<hostname></code> with the value being a
comma-separated list of Kafka partitions to use. For example,
+ <code>partitions.nifi-01=0, 3, 6, 9</code>,
<code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5,
8, 11</code>.
+ The hostname that is used can be the fully qualified hostname, the
"simple" hostname, or the IP address. There must be an entry for each node in
+ the cluster, or the Processor will become invalid. If it is
desirable for a node to not have any partitions assigned to it, a Property may
be
+ added for the hostname with an empty string as the value.
+ </p>
+
+ <p>
+ In order to use a static mapping of Kafka partitions, the "Topic
Name Format" must be set to "names" rather than "pattern."
Review comment:
```suggestion
In order to use a static mapping of Kafka partitions, the "Topic
Name Format" must be set to "names" rather than "pattern".
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]