[ 
https://issues.apache.org/jira/browse/NIFI-13408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860906#comment-17860906
 ] 

Joe Witt commented on NIFI-13408:
---------------------------------

[~andrealves]Thanks for the description.  The key point here is that you see a 
need for a more dynamic way of realizing new topics exist we should monitor.

Today we offer based on user supplied topic names OR by user supplied topic 
name pattern.

We do not take incoming flowfiles for this processor since it is a source 
processor and it was not designed for that and to accomplish what you want I 
suggest a different approach.  

Further whatever is done needs to be consistent with both ConsumeKafka and 
ConsumeKafkaRecord.

The key here is "How to know about topics that exist of interest (as well as 
new ones that arrive while running)".

Consider an approach whereby you offer a third strategy such as 'dynamic' and 
if that is chosen the user must provide a 'Topic Provider Controller Service' 
which looks up these values.  You can then have as many such implementations as 
you need perhaps calling some API that tells you topics you want or a database 
or whatever.

That then generates the list of topics and possibly the rest of the code is 
unchanged and notably we do not change the user logic/semantics of this very 
battle tested component.

Thanks

> Dynamically Update Kafka Topic in ConsumeKafka_2_6 Processor
> ------------------------------------------------------------
>
>                 Key: NIFI-13408
>                 URL: https://issues.apache.org/jira/browse/NIFI-13408
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: André Alves
>            Priority: Major
>         Attachments: flow.png
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> The main goal is to enhance the ConsumeKafka_2_6 processor to dynamically 
> change the Kafka topic based on incoming flowfile attributes. This allows the 
> processor to be more flexible and adaptable to varying data flows by updating 
> the Kafka topic as specified by the attributes in the flowfiles. To implement 
> this new feature, the following changes were made:
>  * Updated the processor to be able to receive incoming flowfiles.
>  * Enabled the "Topic Name(s)" property to support expression language, 
> allowing it to take values from flowfile attributes.
>  * Added a new entry named flowNames to the "Topic Name Format" property. 
> When this entry is chosen, the processor will determine the Kafka topic based 
> on the flowfile attribute defined in the "Topic Name(s)" property using 
> expression language.
>  * Created a JUnit test to cover this new feature.
> The ConsumeKafka_2_6 processor maintains its original behavior even with 
> flowfile support. It can collect Kafka messages regardless of its connection 
> status and without needing stimulation from a flowfile. The key difference is 
> its new capability to support flowfile processing, allowing for dynamic topic 
> changes.
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> There are use cases where the Kafka topic name is not known in advance and 
> may change dynamically based on various factors. Therefore, it is essential 
> for Apache NiFi to support dynamic Kafka topic management to accommodate 
> these scenarios. This feature would empower NiFi users to handle complex, 
> evolving data pipelines more effectively, making their systems more resilient 
> and adaptable to changing data requirements.
> To illustrate the importance of this feature, here are some use cases across 
> different domains:
>  # {*}Fraud Detection in Financial Services{*}: In the financial sector, 
> transaction streams can be dynamically routed to different Kafka topics based 
> on predefined rules, such as transaction value thresholds. Suspicious 
> transactions are directed to a specific topic for further analysis, while 
> regular transactions are routed to another topic. This setup allows for 
> specialized processing and monitoring, improving the system's responsiveness 
> and accuracy without requiring application restarts or reconfigurations.
>  # {*}Order Management in Retail{*}: In retail, dynamic routing helps manage 
> streams of real-time orders. Orders can be routed based on customer loyalty 
> levels, retrieved from an external database. This ensures each order is 
> processed according to the customer’s status, facilitating personalized 
> services and targeted marketing efforts.
>  # {*}IoT Applications{*}: In IoT applications, new Kafka topics can be 
> dynamically created for each new sensor that comes online, routing sensor 
> data to the appropriate topic for processing. This capability is crucial for 
> handling the dynamic nature of IoT environments.
> For more detailed examples and use cases, please refer to [this 
> link|https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/].
> A specific example that highlights the advantage of implementing this feature 
> is the Network Services Platform (NSP) from Nokia. NSP is an open, 
> programmable platform that unifies service automation, network optimization, 
> and dynamic assurance. It offers a comprehensive suite of applications, 
> northbound APIs, and an automation framework to manage and control 
> multivendor IP/optical wide area networks.
> The NSP uses the Fault Management REST API services to perform management 
> operations on alarms. This product uses the NSP Kafka Notification Service to 
> receive near real-time notifications on new, modified, and deleted NSP 
> alarms. These operations require authentication and subscriptions. According 
> to the [API 
> documentation|https://www.postman.com/warped-escape-258288/workspace/starhub-srv6/request/4891045-af121b1f-5cf3-418e-83cc-45d132f3a132],
>  the Kafka topic is known only after successful authentication and 
> subscription creation. Initially, the topic name is not known.
> To integrate NiFi with this platform, we can design a flow where the Kafka 
> consumer can handle dynamic topics effectively.
> !flow.png|width=759,height=387!
>  
> By implementing support for dynamic Kafka topics, Apache NiFi will 
> significantly enhance its capability to manage real-time data flows, 
> providing greater flexibility and scalability for users across various 
> industries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to