[ https://issues.apache.org/jira/browse/NIFI-6937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tamas Palfy updated NIFI-6937: ------------------------------ Description: ReportLineageToAtlas with certain setup can throw the following exception, failing to send reports to Atlas (but keeping on trying infinitely): {code:java} Error running task[id=9a705e6d-0168-1000-0000-00001cacda42] due to java.lang.IllegalStateException: Duplicate key {Id='(type: nifi_queue, id: 03f60cff-0aca-4536-a4ff-00eab811600c)', traits=[], values={}} {code} The exception is coming from [https://github.com/apache/nifi/blob/79a7014a95dc3087f88248c732fb1e4ad8e6e128/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java#L390] ]and The problem is that an Atlass Processor entity has a _nifi_queue_ and a _nifi_data_ DataSet input entity with the same _qualifiedName_. It can happen when the NiFi processor (*{{P_Subject}}*) that corresponds to the Atlas Processor entity # has an inbound connection that is represented by an Atlas by a _nifi_queue_ entity. (There are multiple ways to enforce this, one by making sure the origin processor of the inbound queue (*{{P_Origin}}*, where *{{P_Origin -> P_Subject}}*) has a connection to another processor as well, like *{{P_Origin -> P_Other}}*, so the flow looks like this: {code:java} /->P_Subject / P_Origin \ \->P_Other {code} # also generates an input (CREATE, RECEIVE or FETCH) provenance event on its own and does not have a special input (like _fs_path_ or _hive_table_), just uses the generic _nifi_data_ Atlas type for representing its input (called "unknown" processor in the documentation of the reporting task) See attached [#atlas_duplicat_key.xml] for an example flow template. Here _InvokeHTTP_ has an input _nifi_queue_ entity in Atlas (see explanation above, for more details see the Path Separation Logic section in the reporting task docs). Its _qualifiedName_ is _processorUUID@clustername_ (derived from the next processor's UUID, so _InvokeHTTP_'s UUID in this case). It also sends the incoming flowfile in the HTTP request and creates another flowfile from the HTTP response which generates a FETCH event which in turn generates a _nifi_data_ entity in Atlas. Its _qualifiedName_ is also _processorUUID@clustername_ (using the processor's UUID that generates the event, so _InvokeHTTP_'s UUID). These two entities having the same _qualifiedName_, causing the duplicate key error. was: ReportLineageToAtlas with certain setup can throw the following exception, failing to send reports to Atlas (but keeping on trying infinitely): {code:java} Error running task[id=9a705e6d-0168-1000-0000-00001cacda42] due to java.lang.IllegalStateException: Duplicate key {Id='(type: nifi_queue, id: 03f60cff-0aca-4536-a4ff-00eab811600c)', traits=[], values={}} {code} The exception is coming from [https://github.com/apache/nifi/blob/79a7014a95dc3087f88248c732fb1e4ad8e6e128/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java#L390|http://example.com] ]and The problem is that an Atlass Processor entity has a _nifi_queue_ and a _nifi_data_ DataSet input entity with the same _qualifiedName_. It can happen when the NiFi processor (*{{P_Subject}}*) that corresponds to the Atlas Processor entity # has an inbound connection that is represented by an Atlas by a _nifi_queue_ entity. (There are multiple ways to enforce this, one by making sure the origin processor of the inbound queue (*{{P_Origin}}*, where *{{P_Origin -> P_Subject}}*) has a connection to another processor as well, like *{{P_Origin -> P_Other}}*, so the flow looks like this: {code:java} /->P_Subject / P_Origin \ \->P_Other {code} # also generates an input (CREATE, RECEIVE or FETCH) provenance event on its own and does not have a special input (like _fs_path_ or _hive_table_), just uses the generic _nifi_data_ Atlas type for representing its input (called "unknown" processor in the documentation of the reporting task) See attached [#atlas_duplicat_key.xml] for an example flow template. Here _InvokeHTTP_ has an input _nifi_queue_ entity in Atlas (see explanation above, for more details see the Path Separation Logic section in the reporting task docs). Its _qualifiedName_ is _processorUUID@clustername_ (derived from the next processor's UUID, so _InvokeHTTP_'s UUID in this case). It also sends the incoming flowfile in the HTTP request and creates another flowfile from the HTTP response which generates a FETCH event which in turn generates a _nifi_data_ entity in Atlas. Its _qualifiedName_ is also _processorUUID@clustername_ (using the processor's UUID that generates the event, so _InvokeHTTP_'s UUID). These two entities having the same _qualifiedName_, causing the duplicate key error. > Fix: ReportLineageToAtlas creates data and queue with same qualified name > leading to exception > ---------------------------------------------------------------------------------------------- > > Key: NIFI-6937 > URL: https://issues.apache.org/jira/browse/NIFI-6937 > Project: Apache NiFi > Issue Type: Bug > Reporter: Tamas Palfy > Priority: Major > Attachments: atlas_duplicate_key.xml > > > ReportLineageToAtlas with certain setup can throw the following exception, > failing to send reports to Atlas (but keeping on trying infinitely): > {code:java} > Error running task[id=9a705e6d-0168-1000-0000-00001cacda42] due to > java.lang.IllegalStateException: Duplicate key {Id='(type: nifi_queue, id: > 03f60cff-0aca-4536-a4ff-00eab811600c)', traits=[], values={}} > {code} > The exception is coming from > [https://github.com/apache/nifi/blob/79a7014a95dc3087f88248c732fb1e4ad8e6e128/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java#L390] > ]and The problem is that an Atlass Processor entity has a _nifi_queue_ and a > _nifi_data_ DataSet input entity with the same _qualifiedName_. > It can happen when the NiFi processor (*{{P_Subject}}*) that corresponds to > the Atlas Processor entity > # has an inbound connection that is represented by an Atlas by a > _nifi_queue_ entity. (There are multiple ways to enforce this, one by making > sure the origin processor of the inbound queue (*{{P_Origin}}*, where > *{{P_Origin -> P_Subject}}*) has a connection to another processor as well, > like *{{P_Origin -> P_Other}}*, so the flow looks like this: > {code:java} > /->P_Subject > / > P_Origin > \ > \->P_Other > {code} > # also generates an input (CREATE, RECEIVE or FETCH) provenance event on its > own and does not have a special input (like _fs_path_ or _hive_table_), just > uses the generic _nifi_data_ Atlas type for representing its input (called > "unknown" processor in the documentation of the reporting task) > See attached [#atlas_duplicat_key.xml] for an example flow template. > Here _InvokeHTTP_ has an input _nifi_queue_ entity in Atlas (see explanation > above, for more details see the Path Separation Logic section in the > reporting task docs). Its _qualifiedName_ is _processorUUID@clustername_ > (derived from the next processor's UUID, so _InvokeHTTP_'s UUID in this case). > It also sends the incoming flowfile in the HTTP request and creates another > flowfile from the HTTP response which generates a FETCH event which in turn > generates a _nifi_data_ entity in Atlas. Its _qualifiedName_ is also > _processorUUID@clustername_ (using the processor's UUID that generates the > event, so _InvokeHTTP_'s UUID). > These two entities having the same _qualifiedName_, causing the duplicate key > error. -- This message was sent by Atlassian Jira (v8.3.4#803005)