pvillard31 commented on a change in pull request #4672:
URL: https://github.com/apache/nifi/pull/4672#discussion_r526136155



##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             written to a FlowFile by serializing the message with the 
configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka 
topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming

Review comment:
       ```suggestion
               assigned to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             written to a FlowFile by serializing the message with the 
configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka 
topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned 
partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data 
from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance 
and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture 
(CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered 
them to their final destination. NiFi is then stopped and restarted, and that 
takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have 
been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, 
Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, 
those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically 
instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 
specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data 
from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is 
restarted. By
+            using this approach, we can ensure that the data that already was 
pulled can be processed (assuming First In First Out Prioritizers are used) 
before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka 
partition(s), one or more user-defined properties must be added using the 
naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a 
comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, 
<code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 
8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the 
"simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is 
desirable for a node to not have any partitions assigned to it, a Property may 
be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic 
Name Format" must be set to "names" rather than "pattern."

Review comment:
       ```suggestion
               In order to use a static mapping of Kafka partitions, the "Topic 
Name Format" must be set to "names" rather than "pattern".
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka 
topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned 
partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data 
from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance 
and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture 
(CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered 
them to their final destination. NiFi is then stopped and restarted, and that 
takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have 
been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, 
Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, 
those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically 
instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 
specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data 
from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is 
restarted. By
+            using this approach, we can ensure that the data that already was 
pulled can be processed (assuming First In First Out Prioritizers are used) 
before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka 
partition(s), one or more user-defined properties must be added using the 
naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a 
comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, 
<code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 
8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the 
"simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is 
desirable for a node to not have any partitions assigned to it, a Property may 
be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic 
Name Format" must be set to "names" rather than "pattern."

Review comment:
       ```suggestion
               In order to use a static mapping of Kafka partitions, the "Topic 
Name Format" must be set to "names" rather than "pattern".
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka 
topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming

Review comment:
       ```suggestion
               assigned to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             written to a FlowFile by serializing the message with the 
configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka 
topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming

Review comment:
       ```suggestion
               assigned to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka 
topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming

Review comment:
       ```suggestion
               assigned to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             written to a FlowFile by serializing the message with the 
configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka 
topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned 
partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data 
from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance 
and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture 
(CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered 
them to their final destination. NiFi is then stopped and restarted, and that 
takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have 
been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, 
Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, 
those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically 
instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 
specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data 
from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is 
restarted. By
+            using this approach, we can ensure that the data that already was 
pulled can be processed (assuming First In First Out Prioritizers are used) 
before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka 
partition(s), one or more user-defined properties must be added using the 
naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a 
comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, 
<code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 
8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the 
"simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is 
desirable for a node to not have any partitions assigned to it, a Property may 
be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic 
Name Format" must be set to "names" rather than "pattern."

Review comment:
       ```suggestion
               In order to use a static mapping of Kafka partitions, the "Topic 
Name Format" must be set to "names" rather than "pattern".
   ```

##########
File path: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
##########
@@ -29,6 +29,48 @@ <h2>Description</h2>
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka 
topics in such a way that the topics to consume from are randomly
+            assigned to to the nodes in the NiFi cluster. Consider a scenario 
where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned 
partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data 
from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance 
and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture 
(CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered 
them to their final destination. NiFi is then stopped and restarted, and that 
takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have 
been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, 
Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, 
those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically 
instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 
specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data 
from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is 
restarted. By
+            using this approach, we can ensure that the data that already was 
pulled can be processed (assuming First In First Out Prioritizers are used) 
before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka 
partition(s), one or more user-defined properties must be added using the 
naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a 
comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, 
<code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 
8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the 
"simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is 
desirable for a node to not have any partitions assigned to it, a Property may 
be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic 
Name Format" must be set to "names" rather than "pattern."

Review comment:
       ```suggestion
               In order to use a static mapping of Kafka partitions, the "Topic 
Name Format" must be set to "names" rather than "pattern".
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to