[
https://issues.apache.org/jira/browse/NIFI-6937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tamas Palfy updated NIFI-6937:
------------------------------
Description:
ReportLineageToAtlas with certain setup can throw the following exception,
failing to send reports to Atlas (but keeping on trying infinitely):
{code:java}
Error running task[id=9a705e6d-0168-1000-0000-00001cacda42] due to
java.lang.IllegalStateException: Duplicate key {Id='(type: nifi_queue, id:
03f60cff-0aca-4536-a4ff-00eab811600c)', traits=[], values={}}
{code}
The exception is coming from
[https://github.com/apache/nifi/blob/79a7014a95dc3087f88248c732fb1e4ad8e6e128/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java#L390|http://example.com]
]and The problem is that an Atlass Processor entity has a _nifi_queue_ and a
_nifi_data_ DataSet input entity with the same _qualifiedName_.
It can happen when the NiFi processor (*{{P_Subject}}*) that corresponds to the
Atlas Processor entity
# has an inbound connection that is represented by an Atlas by a _nifi_queue_
entity. (There are multiple ways to enforce this, one by making sure the origin
processor of the inbound queue (*{{P_Origin}}*, where *{{P_Origin ->
P_Subject}}*) has a connection to another processor as well, like *{{P_Origin
-> P_Other}}*, so the flow looks like this:
{code:java}
/->P_Subject
/
P_Origin
\
\->P_Other
{code}
# also generates an input (CREATE, RECEIVE or FETCH) provenance event on its
own and does not have a special input (like _fs_path_ or _hive_table_), just
uses the generic _nifi_data_ Atlas type for representing its input (called
"unknown" processor in the documentation of the reporting task)
See attached [#atlas_duplicat_key.xml] for an example flow template.
Here _InvokeHTTP_ has an input _nifi_queue_ entity in Atlas (see explanation
above, for more details see the Path Separation Logic section in the reporting
task docs). Its _qualifiedName_ is _processorUUID@clustername_ (derived from
the next processor's UUID, so _InvokeHTTP_'s UUID in this case).
It also sends the incoming flowfile in the HTTP request and creates another
flowfile from the HTTP response which generates a FETCH event which in turn
generates a _nifi_data_ entity in Atlas. Its _qualifiedName_ is also
_processorUUID@clustername_ (using the processor's UUID that generates the
event, so _InvokeHTTP_'s UUID).
These two entities having the same _qualifiedName_, causing the duplicate key
error.
was:
ReportLineageToAtlas with certain setup can throw the following exception,
failing to send reports to Atlas (but keeping on trying infinitely):
{code:java}
Error running task[id=9a705e6d-0168-1000-0000-00001cacda42] due to
java.lang.IllegalStateException: Duplicate key {Id='(type: nifi_queue, id:
03f60cff-0aca-4536-a4ff-00eab811600c)', traits=[], values={}}
{code}
The exception is coming from
[https://github.com/apache/nifi/blob/79a7014a95dc3087f88248c732fb1e4ad8e6e128/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java#L390
]and The problem is that an Atlass Processor entity has a _nifi_queue_ and a
_nifi_data_ DataSet input entity with the same _qualifiedName_.
It can happen when the NiFi processor (*{{P_Subject}}*) that corresponds to the
Atlas Processor entity
# has an inbound connection that is represented by an Atlas by a _nifi_queue_
entity. (There are multiple ways to enforce this, one by making sure the origin
processor of the inbound queue (*{{P_Origin}}*, where *{{P_Origin ->
P_Subject}}*) has a connection to another processor as well, like *{{P_Origin
-> P_Other}}*, so the flow looks like this:
{code:java}
/->P_Subject
/
P_Origin
\
\->P_Other
{code}
# also generates an input (CREATE, RECEIVE or FETCH) provenance event on its
own and does not have a special input (like _fs_path_ or _hive_table_), just
uses the generic _nifi_data_ Atlas type for representing its input (called
"unknown" processor in the documentation of the reporting task)
See attached [#atlas_duplicat_key.xml] for an example flow template.
Here _InvokeHTTP_ has an input _nifi_queue_ entity in Atlas (see explanation
above, for more details see the Path Separation Logic section in the reporting
task docs). Its _qualifiedName_ is _processorUUID@clustername_ (derived from
the next processor's UUID, so _InvokeHTTP_'s UUID in this case).
It also sends the incoming flowfile in the HTTP request and creates another
flowfile from the HTTP response which generates a FETCH event which in turn
generates a _nifi_data_ entity in Atlas. Its _qualifiedName_ is also
_processorUUID@clustername_ (using the processor's UUID that generates the
event, so _InvokeHTTP_'s UUID).
These two entities having the same _qualifiedName_, causing the duplicate key
error.
> Fix: ReportLineageToAtlas creates data and queue with same qualified name
> leading to exception
> ----------------------------------------------------------------------------------------------
>
> Key: NIFI-6937
> URL: https://issues.apache.org/jira/browse/NIFI-6937
> Project: Apache NiFi
> Issue Type: Bug
> Reporter: Tamas Palfy
> Priority: Major
> Attachments: atlas_duplicate_key.xml
>
>
> ReportLineageToAtlas with certain setup can throw the following exception,
> failing to send reports to Atlas (but keeping on trying infinitely):
> {code:java}
> Error running task[id=9a705e6d-0168-1000-0000-00001cacda42] due to
> java.lang.IllegalStateException: Duplicate key {Id='(type: nifi_queue, id:
> 03f60cff-0aca-4536-a4ff-00eab811600c)', traits=[], values={}}
> {code}
> The exception is coming from
> [https://github.com/apache/nifi/blob/79a7014a95dc3087f88248c732fb1e4ad8e6e128/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NotificationSender.java#L390|http://example.com]
> ]and The problem is that an Atlass Processor entity has a _nifi_queue_ and a
> _nifi_data_ DataSet input entity with the same _qualifiedName_.
> It can happen when the NiFi processor (*{{P_Subject}}*) that corresponds to
> the Atlas Processor entity
> # has an inbound connection that is represented by an Atlas by a
> _nifi_queue_ entity. (There are multiple ways to enforce this, one by making
> sure the origin processor of the inbound queue (*{{P_Origin}}*, where
> *{{P_Origin -> P_Subject}}*) has a connection to another processor as well,
> like *{{P_Origin -> P_Other}}*, so the flow looks like this:
> {code:java}
> /->P_Subject
> /
> P_Origin
> \
> \->P_Other
> {code}
> # also generates an input (CREATE, RECEIVE or FETCH) provenance event on its
> own and does not have a special input (like _fs_path_ or _hive_table_), just
> uses the generic _nifi_data_ Atlas type for representing its input (called
> "unknown" processor in the documentation of the reporting task)
> See attached [#atlas_duplicat_key.xml] for an example flow template.
> Here _InvokeHTTP_ has an input _nifi_queue_ entity in Atlas (see explanation
> above, for more details see the Path Separation Logic section in the
> reporting task docs). Its _qualifiedName_ is _processorUUID@clustername_
> (derived from the next processor's UUID, so _InvokeHTTP_'s UUID in this case).
> It also sends the incoming flowfile in the HTTP request and creates another
> flowfile from the HTTP response which generates a FETCH event which in turn
> generates a _nifi_data_ entity in Atlas. Its _qualifiedName_ is also
> _processorUUID@clustername_ (using the processor's UUID that generates the
> event, so _InvokeHTTP_'s UUID).
> These two entities having the same _qualifiedName_, causing the duplicate key
> error.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)