> On May 18, 2021, 4:34 a.m., Sarath Subramanian wrote:
> > webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
> > Lines 76 (patched)
> > <https://reviews.apache.org/r/73329/diff/6/?file=2250100#file2250100line76>
> >
> >     we don't need to reverse iterate the list, since we are iterating the 
> > entire list:
> >     
> >     consider maintaining a minValue - the timestamp in cache should be the 
> > closest to the spooled timestamp: consider using the below method for 
> > better readability
> >     
> >     ```
> >         public String getGuidForDeletedEntity(String qualifiedName, long 
> > spooledMsgTimestamp) {
> >             if (!this.entitiesDeletedByDelete.containsKey(qualifiedName) || 
> > spooledMsgTimestamp <= 0) {
> >                 return null;
> >             }
> >     
> >             String                             ret                = null;
> >             List<TypesUtil.Pair<Long, String>> timestampGuidPairs = 
> > this.entitiesDeletedByDelete.get(qualifiedName);
> >             long                               minTimestamp       = 
> > Long.MAX_VALUE;
> >     
> >             for (TypesUtil.Pair<Long, String> tsGuidPair : 
> > timestampGuidPairs) {
> >                 String entityGuid            = tsGuidPair.right;
> >                 long   entityDeleteTimestamp = tsGuidPair.left;
> >                 long   timestampDifference   = 
> > Math.abs(entityDeleteTimestamp - spooledMsgTimestamp);
> >     
> >                if (timestampDifference < minTimestamp) {
> >                    minTimestamp = timestampDifference;
> >                    ret          = entityGuid;
> >                }
> >             }
> >         
> >             return ret;
> >         }
> >     ```

I modified the logic and added unit test to cover the logic. It is better than 
before and does not use Math.abs.


- Ashutosh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/73329/#review223002
-----------------------------------------------------------


On May 18, 2021, 5:39 a.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/73329/
> -----------------------------------------------------------
> 
> (Updated May 18, 2021, 5:39 a.m.)
> 
> 
> Review request for atlas, Radhika Kundam and Sarath Subramanian.
> 
> 
> Bugs: ATLAS-4152
>     https://issues.apache.org/jira/browse/ATLAS-4152
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> **Background**
> As part of ATLAS-4204, HS2 notifications send entity-lineage only (provided 
> the poperty is enabled).
> 
> When Spooling enabled the order of messages can potentially change. The 
> notification messages coming from HS2 and HMS may not be in the same order as 
> when they arrived with direct notification.
> 
> Problem: 
> Consider the sequence of arriving messages:
> 
> This is the sequence of messages for Entity 1 (C = create, U = update, D = 
> delete, L?x = Lineage of type 'x')
> No problem: C1, U1, L1x, L1y, D1
> Problem: C1, U1, D1, L1x, L1y
> 
> This implementation attempts to handle ths problem mentioned above. If the 
> above case is not handled, it will end up creating shell entities, since 
> deleted entities are not looked up as part of entity creation.
> 
> **Approach**
> Used bounded stream approach where an incoming stream of messages is bounded 
> with an indicator that it originates from spool. This helps makes localized 
> decisions on the incoming stream of messages.
> 
> High-level approach:
> - Messages when written to the spool are tagged with a timestamp.
> - Deleted entities are maintained in a cache.
> - Lineage-only message are checked if they refer to a deleted entity.
> - If they refer to deleted entity, they are stitched to the one present in 
> the cache only if it falls within the threshold.
> - Using step-climbing approach for locating right entity to stitch lineage to.
> 
> New: _EntityCorrelationsManager_: Uses message timestamp and cached entity 
> qualifiedName-GUID map.
> Modifed: _NotificationHookConsumer_ Uses the new class.
> New: _HiveDDLLineagePreprocess_ Uses entity-correlation to link to deleted 
> entities.
> Modified: _SpoolConfiguration_: Added new configuration to pause message 
> sending after destination is available: 
> _atlas.hook.spool.pause.before.send.sec_.
> 
> 
> Diffs
> -----
> 
>   
> intg/src/main/java/org/apache/atlas/model/notification/AtlasNotificationMessage.java
>  810ba97c9 
>   notification/src/main/java/org/apache/atlas/hook/AtlasHook.java 9162ac144 
>   notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java 
> f7d9668ec 
>   notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java 
> 22bd79fdf 
>   notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
> 3d1b3ccf1 
>   
> notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
>  3264e264c 
>   
> notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
>  edd8ed931 
>   
> notification/src/main/java/org/apache/atlas/notification/spool/AtlasFileSpool.java
>  2d7d19595 
>   
> notification/src/main/java/org/apache/atlas/notification/spool/Publisher.java 
> 22242c933 
>   
> notification/src/main/java/org/apache/atlas/notification/spool/SpoolConfiguration.java
>  a9a3a78cc 
>   notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java 
> 2cacaaadc 
>   
> notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java
>  d7e4959f7 
>   
> notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java
>  167efbecc 
>   
> webapp/src/main/java/org/apache/atlas/notification/EntityCorrelationManager.java
>  PRE-CREATION 
>   
> webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
>  84cc8d813 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
>  89568e236 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveDbDDLPreprocessor.java
>  PRE-CREATION 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
>  86e3384ee 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/HiveTableDDLPreprocessor.java
>  PRE-CREATION 
>   
> webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
>  608b4a304 
>   
> webapp/src/test/java/org/apache/atlas/notification/EntityCorrelationManagerTest.java
>  PRE-CREATION 
> 
> 
> Diff: https://reviews.apache.org/r/73329/diff/7/
> 
> 
> Testing
> -------
> 
> **Functional tests**
> Manual verification of scenarios.
> 
> **Test data**
> 11:55: Kafka Down!
> 12:00: create table t01(c01 string);
> 12:10: create view t06_vw as select * from t01;
> 12:20: create view t06_vw_1 as select * from t01;
> 12:30: create view t06_vw_2 as select * from t01;
> 12:40: create view t06_vw_3 as select * from t01;
> 12:50: DROP TABLE t01;
> 12:52: create table t01(c01 string);
> 12:53: create view t06_vwx as select * from t01;
> 12:54: create view t06_vwx_1 as select * from t01;
> 12:55: create view t06_vwx_2 as select * from t01;
> 12:56: create view t06_vwx_3 as select * from t01;
> 12:57: DROP TABLE T01;
> 12:58: create table t01(c01 string);
> 01:00: Kafka UP!
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>

Reply via email to