[ 
https://issues.apache.org/jira/browse/NIFI-13408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

André Alves updated NIFI-13408:
-------------------------------
    Description: 
The main goal is to enhance the ConsumeKafka_2_6 processor to dynamically 
change the Kafka topic based on incoming flowfile attributes. This allows the 
processor to be more flexible and adaptable to varying data flows by updating 
the Kafka topic as specified by the attributes in the flowfiles. To implement 
this new feature, the following changes were made:
 * Updated the processor to be able to receive incoming flowfiles.
 * Enabled the "Topic Name(s)" property to support expression language, 
allowing it to take values from flowfile attributes.
 * Added a new entry named flowNames to the "Topic Name Format" property. When 
this entry is chosen, the processor will determine the Kafka topic based on the 
flowfile attribute defined in the "Topic Name(s)" property using expression 
language.
 * Created a JUnit test to cover this new feature.

The ConsumeKafka_2_6 processor maintains its original behavior even with 
flowfile support. It can collect Kafka messages regardless of its connection 
status and without needing stimulation from a flowfile. The key difference is 
its new capability to support flowfile processing, allowing for dynamic topic 
changes.

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
There are use cases where the Kafka topic name is not known in advance and may 
change dynamically based on various factors. Therefore, it is essential for 
Apache NiFi to support dynamic Kafka topic management to accommodate these 
scenarios. This feature would empower NiFi users to handle complex, evolving 
data pipelines more effectively, making their systems more resilient and 
adaptable to changing data requirements.

To illustrate the importance of this feature, here are some use cases across 
different domains:
 # {*}Fraud Detection in Financial Services{*}: In the financial sector, 
transaction streams can be dynamically routed to different Kafka topics based 
on predefined rules, such as transaction value thresholds. Suspicious 
transactions are directed to a specific topic for further analysis, while 
regular transactions are routed to another topic. This setup allows for 
specialized processing and monitoring, improving the system's responsiveness 
and accuracy without requiring application restarts or reconfigurations.
 # {*}Order Management in Retail{*}: In retail, dynamic routing helps manage 
streams of real-time orders. Orders can be routed based on customer loyalty 
levels, retrieved from an external database. This ensures each order is 
processed according to the customer’s status, facilitating personalized 
services and targeted marketing efforts.
 # {*}IoT Applications{*}: In IoT applications, new Kafka topics can be 
dynamically created for each new sensor that comes online, routing sensor data 
to the appropriate topic for processing. This capability is crucial for 
handling the dynamic nature of IoT environments.

For more detailed examples and use cases, please refer to [this 
link|https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/].

A specific example that highlights the advantage of implementing this feature 
is the Network Services Platform (NSP) from Nokia. NSP is an open, programmable 
platform that unifies service automation, network optimization, and dynamic 
assurance. It offers a comprehensive suite of applications, northbound APIs, 
and an automation framework to manage and control multivendor IP/optical wide 
area networks.

The NSP uses the Fault Management REST API services to perform management 
operations on alarms. This product uses the NSP Kafka Notification Service to 
receive near real-time notifications on new, modified, and deleted NSP alarms. 
These operations require authentication and subscriptions. According to the 
[API 
documentation|https://www.postman.com/warped-escape-258288/workspace/starhub-srv6/request/4891045-af121b1f-5cf3-418e-83cc-45d132f3a132],
 the Kafka topic is known only after successful authentication and subscription 
creation. Initially, the topic name is not known.

To integrate NiFi with this platform, we can design a flow where the Kafka 
consumer can handle dynamic topics effectively.

!flow.png|width=759,height=387!

 

By implementing support for dynamic Kafka topics, Apache NiFi will 
significantly enhance its capability to manage real-time data flows, providing 
greater flexibility and scalability for users across various industries.

  was:
The main goal is to enhance the ConsumeKafka_2_6 processor to dynamically 
change the Kafka topic based on incoming flowfile attributes. This allows the 
processor to be more flexible and adaptable to varying data flows by updating 
the Kafka topic as specified by the attributes in the flowfiles. To implement 
this new feature, the following changes were made:
 * Updated the processor to be able to receive incoming flowfiles.
 * Enabled the "Topic Name(s)" property to support expression language, 
allowing it to take values from flowfile attributes.
 * Added a new entry named flowNames to the "Topic Name Format" property. When 
this entry is chosen, the processor will determine the Kafka topic based on the 
flowfile attribute defined in the "Topic Name(s)" property using expression 
language.
 * Created a JUnit test to cover this new feature.

The ConsumeKafka_2_6 processor maintains its original behavior even with 
flowfile support. It can collect Kafka messages regardless of its connection 
status and without needing stimulation from a flowfile. The key difference is 
its new capability to support flowfile processing, allowing for dynamic topic 
changes.

-------------------------------------------------------------------------------------
There are use cases where the Kafka topic name is not known in advance and may 
change dynamically based on various factors. Therefore, it is essential for 
Apache NiFi to support dynamic Kafka topic management to accommodate these 
scenarios. This feature would empower NiFi users to handle complex, evolving 
data pipelines more effectively, making their systems more resilient and 
adaptable to changing data requirements.

To illustrate the importance of this feature, here are some use cases across 
different domains:
 # {*}Fraud Detection in Financial Services{*}: In the financial sector, 
transaction streams can be dynamically routed to different Kafka topics based 
on predefined rules, such as transaction value thresholds. Suspicious 
transactions are directed to a specific topic for further analysis, while 
regular transactions are routed to another topic. This setup allows for 
specialized processing and monitoring, improving the system's responsiveness 
and accuracy without requiring application restarts or reconfigurations.
 # {*}Order Management in Retail{*}: In retail, dynamic routing helps manage 
streams of real-time orders. Orders can be routed based on customer loyalty 
levels, retrieved from an external database. This ensures each order is 
processed according to the customer’s status, facilitating personalized 
services and targeted marketing efforts.
 # {*}IoT Applications{*}: In IoT applications, new Kafka topics can be 
dynamically created for each new sensor that comes online, routing sensor data 
to the appropriate topic for processing. This capability is crucial for 
handling the dynamic nature of IoT environments.

For more detailed examples and use cases, please refer to [this 
link|https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/].

A specific example that highlights the advantage of implementing this feature 
is the Network Services Platform (NSP) from Nokia. NSP is an open, programmable 
platform that unifies service automation, network optimization, and dynamic 
assurance. It offers a comprehensive suite of applications, northbound APIs, 
and an automation framework to manage and control multivendor IP/optical wide 
area networks.

The NSP uses the Fault Management REST API services to perform management 
operations on alarms. This product uses the NSP Kafka Notification Service to 
receive near real-time notifications on new, modified, and deleted NSP alarms. 
These operations require authentication and subscriptions. According to the 
[API 
documentation|https://www.postman.com/warped-escape-258288/workspace/starhub-srv6/request/4891045-af121b1f-5cf3-418e-83cc-45d132f3a132],
 the Kafka topic is known only after successful authentication and subscription 
creation. Initially, the topic name is not known.

To integrate NiFi with this platform, we can design a flow where the Kafka 
consumer can handle dynamic topics effectively.

!flow.png|width=759,height=387!

 

By implementing support for dynamic Kafka topics, Apache NiFi will 
significantly enhance its capability to manage real-time data flows, providing 
greater flexibility and scalability for users across various industries.


> Dynamically Update Kafka Topic in ConsumeKafka_2_6 Processor
> ------------------------------------------------------------
>
>                 Key: NIFI-13408
>                 URL: https://issues.apache.org/jira/browse/NIFI-13408
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: André Alves
>            Priority: Major
>         Attachments: flow.png
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> The main goal is to enhance the ConsumeKafka_2_6 processor to dynamically 
> change the Kafka topic based on incoming flowfile attributes. This allows the 
> processor to be more flexible and adaptable to varying data flows by updating 
> the Kafka topic as specified by the attributes in the flowfiles. To implement 
> this new feature, the following changes were made:
>  * Updated the processor to be able to receive incoming flowfiles.
>  * Enabled the "Topic Name(s)" property to support expression language, 
> allowing it to take values from flowfile attributes.
>  * Added a new entry named flowNames to the "Topic Name Format" property. 
> When this entry is chosen, the processor will determine the Kafka topic based 
> on the flowfile attribute defined in the "Topic Name(s)" property using 
> expression language.
>  * Created a JUnit test to cover this new feature.
> The ConsumeKafka_2_6 processor maintains its original behavior even with 
> flowfile support. It can collect Kafka messages regardless of its connection 
> status and without needing stimulation from a flowfile. The key difference is 
> its new capability to support flowfile processing, allowing for dynamic topic 
> changes.
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> There are use cases where the Kafka topic name is not known in advance and 
> may change dynamically based on various factors. Therefore, it is essential 
> for Apache NiFi to support dynamic Kafka topic management to accommodate 
> these scenarios. This feature would empower NiFi users to handle complex, 
> evolving data pipelines more effectively, making their systems more resilient 
> and adaptable to changing data requirements.
> To illustrate the importance of this feature, here are some use cases across 
> different domains:
>  # {*}Fraud Detection in Financial Services{*}: In the financial sector, 
> transaction streams can be dynamically routed to different Kafka topics based 
> on predefined rules, such as transaction value thresholds. Suspicious 
> transactions are directed to a specific topic for further analysis, while 
> regular transactions are routed to another topic. This setup allows for 
> specialized processing and monitoring, improving the system's responsiveness 
> and accuracy without requiring application restarts or reconfigurations.
>  # {*}Order Management in Retail{*}: In retail, dynamic routing helps manage 
> streams of real-time orders. Orders can be routed based on customer loyalty 
> levels, retrieved from an external database. This ensures each order is 
> processed according to the customer’s status, facilitating personalized 
> services and targeted marketing efforts.
>  # {*}IoT Applications{*}: In IoT applications, new Kafka topics can be 
> dynamically created for each new sensor that comes online, routing sensor 
> data to the appropriate topic for processing. This capability is crucial for 
> handling the dynamic nature of IoT environments.
> For more detailed examples and use cases, please refer to [this 
> link|https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/].
> A specific example that highlights the advantage of implementing this feature 
> is the Network Services Platform (NSP) from Nokia. NSP is an open, 
> programmable platform that unifies service automation, network optimization, 
> and dynamic assurance. It offers a comprehensive suite of applications, 
> northbound APIs, and an automation framework to manage and control 
> multivendor IP/optical wide area networks.
> The NSP uses the Fault Management REST API services to perform management 
> operations on alarms. This product uses the NSP Kafka Notification Service to 
> receive near real-time notifications on new, modified, and deleted NSP 
> alarms. These operations require authentication and subscriptions. According 
> to the [API 
> documentation|https://www.postman.com/warped-escape-258288/workspace/starhub-srv6/request/4891045-af121b1f-5cf3-418e-83cc-45d132f3a132],
>  the Kafka topic is known only after successful authentication and 
> subscription creation. Initially, the topic name is not known.
> To integrate NiFi with this platform, we can design a flow where the Kafka 
> consumer can handle dynamic topics effectively.
> !flow.png|width=759,height=387!
>  
> By implementing support for dynamic Kafka topics, Apache NiFi will 
> significantly enhance its capability to manage real-time data flows, 
> providing greater flexibility and scalability for users across various 
> industries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to