-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/73329/#review223005
-----------------------------------------------------------




webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
Lines 93 (patched)
<https://reviews.apache.org/r/73329/#comment312160>

    this checks for the first entry that is less than the spooledTimestamp. We 
should be fetching the entry(ts) having close proximity to spooledTimestamp.
    
    CACHE:
    -----------------------------------
    QName  | Guid                     | 
    -----------------------------------
    T1@cl1 | [7:00: guid1], [7:40: guid2]
    -----------------------------------
    
    6:50 - CTAS (T5) FROM T1 (guid1)
    7:20 - CTAS (T6) FROM T1 (guid2)


- Sarath Subramanian


On May 17, 2021, 10:39 p.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/73329/
> -----------------------------------------------------------
> 
> (Updated May 17, 2021, 10:39 p.m.)
> 
> 
> Review request for atlas, Radhika Kundam and Sarath Subramanian.
> 
> 
> Bugs: ATLAS-4152
>     https://issues.apache.org/jira/browse/ATLAS-4152
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> **Background**
> As part of ATLAS-4204, HS2 notifications send entity-lineage only (provided 
> the poperty is enabled).
> 
> When Spooling enabled the order of messages can potentially change. The 
> notification messages coming from HS2 and HMS may not be in the same order as 
> when they arrived with direct notification.
> 
> Problem: 
> Consider the sequence of arriving messages:
> 
> This is the sequence of messages for Entity 1 (C = create, U = update, D = 
> delete, L?x = Lineage of type 'x')
> No problem: C1, U1, L1x, L1y, D1
> Problem: C1, U1, D1, L1x, L1y
> 
> This implementation attempts to handle ths problem mentioned above. If the 
> above case is not handled, it will end up creating shell entities, since 
> deleted entities are not looked up as part of entity creation.
> 
> **Approach**
> Used bounded stream approach where an incoming stream of messages is bounded 
> with an indicator that it originates from spool. This helps makes localized 
> decisions on the incoming stream of messages.
> 
> High-level approach:
> - Messages when written to the spool are tagged with a timestamp.
> - Deleted entities are maintained in a cache.
> - Lineage-only message are checked if they refer to a deleted entity.
> - If they refer to deleted entity, they are stitched to the one present in 
> the cache only if it falls within the threshold.
> - Using step-climbing approach for locating right entity to stitch lineage to.
> 
> New: _EntityCorrelationsManager_: Uses message timestamp and cached entity 
> qualifiedName-GUID map.
> Modifed: _NotificationHookConsumer_ Uses the new class.
> New: _HiveDDLLineagePreprocess_ Uses entity-correlation to link to deleted 
> entities.
> Modified: _SpoolConfiguration_: Added new configuration to pause message 
> sending after destination is available: 
> _atlas.hook.spool.pause.before.send.sec_.
> 
> 
> Diffs
> -----
> 
>   
> intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
>  810ba97c9 
>   notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 9162ac144 
>   notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java 
> f7d9668ec 
>   notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java 
> 22bd79fdf 
>   notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
> 3d1b3ccf1 
>   
> notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
>  3264e264c 
>   
> notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
>  edd8ed931 
>   
> notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
>  2d7d19595 
>   
> notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java 
> 22242c933 
>   
> notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
>  a9a3a78cc 
>   notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java 
> 2cacaaadc 
>   
> notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
>  d7e4959f7 
>   
> notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
>  167efbecc 
>   
> webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
>  PRE-CREATION 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  84cc8d813 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
>  89568e236 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
>  PRE-CREATION 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
>  86e3384ee 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
>  PRE-CREATION 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
>  608b4a304 
>   
> webapp/src/test/java/org/apache/atlas/notification/EntityCorrelationManagerTest.java
>  PRE-CREATION 
> 
> 
> Diff: https://reviews.apache.org/r/73329/diff/7/
> 
> 
> Testing
> -------
> 
> **Functional tests**
> Manual verification of scenarios.
> 
> **Test data**
> 11:55: Kafka Down!
> 12:00: create table t01(c01 string);
> 12:10: create view t06_vw as select * from t01;
> 12:20: create view t06_vw_1 as select * from t01;
> 12:30: create view t06_vw_2 as select * from t01;
> 12:40: create view t06_vw_3 as select * from t01;
> 12:50: DROP TABLE t01;
> 12:52: create table t01(c01 string);
> 12:53: create view t06_vwx as select * from t01;
> 12:54: create view t06_vwx_1 as select * from t01;
> 12:55: create view t06_vwx_2 as select * from t01;
> 12:56: create view t06_vwx_3 as select * from t01;
> 12:57: DROP TABLE T01;
> 12:58: create table t01(c01 string);
> 01:00: Kafka UP!
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>

Reply via email to