This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new b24f88e  ATLAS-3405: Handling of references to non-existing entities 
in notifications
b24f88e is described below

commit b24f88ecd70d73a8ca8752417317537eb3207343
Author: Sarath Subramanian <[email protected]>
AuthorDate: Sun Sep 8 09:47:15 2019 -0700

    ATLAS-3405: Handling of references to non-existing entities in notifications
---
 .../org/apache/atlas/repository/Constants.java     |  9 ++--
 .../java/org/apache/atlas/AtlasConfiguration.java  |  3 ++
 .../apache/atlas/model/instance/AtlasEntity.java   | 45 ++++++++++++-----
 .../atlas/model/instance/AtlasEntityHeader.java    | 15 +++++-
 .../repository/graph/GraphBackedSearchIndexer.java |  1 +
 .../apache/atlas/repository/graph/GraphHelper.java | 31 ++++--------
 .../graph/v2/AtlasEntityGraphDiscoveryV2.java      | 12 +++--
 .../store/graph/v2/AtlasEntityStoreV2.java         | 12 ++++-
 .../store/graph/v2/EntityGraphMapper.java          | 57 +++++++++++++++++++---
 .../store/graph/v2/EntityGraphRetriever.java       | 25 +++-------
 .../store/graph/v2/IDBasedEntityResolver.java      |  6 ++-
 .../graph/v2/UniqAttrBasedEntityResolver.java      | 15 ++++--
 .../main/java/org/apache/atlas/RequestContext.java |  9 +++-
 .../notification/EntityNotificationListenerV2.java |  2 +
 .../notification/NotificationHookConsumer.java     | 10 ++--
 .../org/apache/atlas/web/filters/AuditFilter.java  | 11 +++--
 16 files changed, 176 insertions(+), 87 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java 
b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 673a85c..2f08efc 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -135,6 +135,8 @@ public final class Constants {
 
     public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY = 
encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
 
+    public static final String IS_INCOMPLETE_PROPERTY_KEY = 
encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
+
     /**
      * search backing index name.
      */
@@ -192,9 +194,10 @@ public final class Constants {
      * replication attributes
      */
 
-    public static final String ATTR_NAME_REFERENCEABLE = "Referenceable.";
-    public static final String ATTR_NAME_REPLICATED_TO = "replicatedTo";
-    public static final String ATTR_NAME_REPLICATED_FROM = "replicatedFrom";
+    public static final String  ATTR_NAME_REFERENCEABLE   = "Referenceable.";
+    public static final String  ATTR_NAME_REPLICATED_TO   = "replicatedTo";
+    public static final String  ATTR_NAME_REPLICATED_FROM = "replicatedFrom";
+    public static final Integer INCOMPLETE_ENTITY_VALUE   = Integer.valueOf(1);
 
     private Constants() {
     }
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 9da51f5..7b71e51 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -48,6 +48,9 @@ public enum AtlasConfiguration {
     
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds",
 15 * 60),
     
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds",
 5 * 60),
 
+    
NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.notification.consumer.create.shell.entity.for.non-existing.ref",
 true),
+    
REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.rest.create.shell.entity.for.non-existing.ref",
 false),
+
     
GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH("atlas.graphstore.indexed.string.safe.length",
 Short.MAX_VALUE),  // based on 
org.apache.hadoop.hbase.client.Mutation.checkRow()
 
     //search configuration
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 659f7d5..67493ba 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -59,16 +59,17 @@ import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 public class AtlasEntity extends AtlasStruct implements Serializable {
     private static final long serialVersionUID = 1L;
 
-    public static final String KEY_GUID               = "guid";
-    public static final String KEY_HOME_ID            = "homeId";
-    public static final String KEY_IS_PROXY           = "isProxy";
-    public static final String KEY_PROVENANCE_TYPE    = "provenanceType";
-    public static final String KEY_STATUS             = "status";
-    public static final String KEY_CREATED_BY         = "createdBy";
-    public static final String KEY_UPDATED_BY         = "updatedBy";
-    public static final String KEY_CREATE_TIME        = "createTime";
-    public static final String KEY_UPDATE_TIME        = "updateTime";
-    public static final String KEY_VERSION            = "version";
+    public static final String KEY_GUID            = "guid";
+    public static final String KEY_HOME_ID         = "homeId";
+    public static final String KEY_IS_PROXY        = "isProxy";
+    public static final String KEY_IS_INCOMPLETE   = "isIncomplete";
+    public static final String KEY_PROVENANCE_TYPE = "provenanceType";
+    public static final String KEY_STATUS          = "status";
+    public static final String KEY_CREATED_BY      = "createdBy";
+    public static final String KEY_UPDATED_BY      = "updatedBy";
+    public static final String KEY_CREATE_TIME     = "createTime";
+    public static final String KEY_UPDATE_TIME     = "updateTime";
+    public static final String KEY_VERSION         = "version";
 
     /**
      * Status of the entity - can be active or deleted. Deleted entities are 
not removed from Atlas store.
@@ -78,6 +79,7 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
     private String  guid           = null;
     private String  homeId         = null;
     private Boolean isProxy        = Boolean.FALSE;
+    private Boolean isIncomplete   = Boolean.FALSE;
     private Integer provenanceType = 0;
     private Status  status         = Status.ACTIVE;
     private String  createdBy      = null;
@@ -133,6 +135,7 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
             Object oGuid             = map.get(KEY_GUID);
             Object homeId            = map.get(KEY_HOME_ID);
             Object isProxy           = map.get(KEY_IS_PROXY);
+            Object isIncomplete      = map.get(KEY_IS_INCOMPLETE);
             Object provenanceType    = map.get(KEY_PROVENANCE_TYPE);
             Object status            = map.get(KEY_STATUS);
             Object createdBy         = map.get(KEY_CREATED_BY);
@@ -156,6 +159,12 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
                 setIsProxy(Boolean.FALSE);
             }
 
+            if (isIncomplete != null) {
+                setIsIncomplete((Boolean) isIncomplete);
+            } else {
+                setIsIncomplete(Boolean.FALSE);
+            }
+
             if (provenanceType instanceof Number) {
                 setProvenanceType(((Number) version).intValue());
             }
@@ -193,6 +202,7 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
             setGuid(other.getGuid());
             setHomeId(other.getHomeId());
             setIsProxy(other.isProxy());
+            setIsIncomplete(other.getIsIncomplete());
             setProvenanceType(other.getProvenanceType());
             setStatus(other.getStatus());
             setCreatedBy(other.getCreatedBy());
@@ -230,6 +240,14 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
         this.isProxy = isProxy;
     }
 
+    public Boolean getIsIncomplete() {
+        return isIncomplete;
+    }
+
+    public void setIsIncomplete(Boolean isIncomplete) {
+        this.isIncomplete = isIncomplete;
+    }
+
     public Integer getProvenanceType() {
         return provenanceType;
     }
@@ -355,6 +373,7 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
         setGuid(nextInternalId());
         setHomeId(null);
         setIsProxy(Boolean.FALSE);
+        setIsIncomplete(Boolean.FALSE);
         setProvenanceType(0);
         setStatus(null);
         setCreatedBy(null);
@@ -380,6 +399,7 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
         sb.append("guid='").append(guid).append('\'');
         sb.append(", homeId='").append(homeId).append('\'');
         sb.append(", isProxy='").append(isProxy).append('\'');
+        sb.append(", isIncomplete=").append(isIncomplete);
         sb.append(", provenanceType=").append(provenanceType);
         sb.append(", status=").append(status);
         sb.append(", createdBy='").append(createdBy).append('\'');
@@ -411,6 +431,7 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
         return Objects.equals(guid, that.guid) &&
                 Objects.equals(homeId, that.homeId) &&
                 Objects.equals(isProxy, that.isProxy) &&
+                Objects.equals(isIncomplete, that.isIncomplete) &&
                 Objects.equals(provenanceType, that.provenanceType) &&
                 status == that.status &&
                 Objects.equals(createdBy, that.createdBy) &&
@@ -424,8 +445,8 @@ public class AtlasEntity extends AtlasStruct implements 
Serializable {
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), guid, homeId, isProxy, 
provenanceType, status, createdBy, updatedBy, createTime, updateTime, version,
-                            relationshipAttributes, classifications);
+        return Objects.hash(super.hashCode(), guid, homeId, isProxy, 
isIncomplete, provenanceType, status,
+                createdBy, updatedBy, createTime, updateTime, version, 
relationshipAttributes, classifications);
     }
 
     @Override
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
index 26687bf..3aef6cd 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
@@ -60,6 +60,7 @@ public class AtlasEntityHeader extends AtlasStruct implements 
Serializable {
     private List<AtlasClassification>       classifications     = null;
     private List<String>                    meaningNames        = null;
     private List<AtlasTermAssignmentHeader> meanings            = null;
+    private Boolean                         isIncomplete        = 
Boolean.FALSE;
 
     public AtlasEntityHeader() {
         this(null, null);
@@ -98,6 +99,7 @@ public class AtlasEntityHeader extends AtlasStruct implements 
Serializable {
             setDisplayText(other.getDisplayText());
             setClassificationNames(other.getClassificationNames());
             setClassifications(other.getClassifications());
+            setIsIncomplete(other.getIsIncomplete());
         }
     }
 
@@ -106,6 +108,7 @@ public class AtlasEntityHeader extends AtlasStruct 
implements Serializable {
         setGuid(entity.getGuid());
         setStatus(entity.getStatus());
         setClassifications(entity.getClassifications());
+        setIsIncomplete(entity.getIsIncomplete());
 
         if (CollectionUtils.isNotEmpty(entity.getClassifications())) {
             this.classificationNames = new 
ArrayList<>(entity.getClassifications().size());
@@ -156,6 +159,14 @@ public class AtlasEntityHeader extends AtlasStruct 
implements Serializable {
         this.classifications = classifications;
     }
 
+    public Boolean getIsIncomplete() {
+        return isIncomplete;
+    }
+
+    public void setIsIncomplete(Boolean isIncomplete) {
+        this.isIncomplete = isIncomplete;
+    }
+
     @Override
     public StringBuilder toString(StringBuilder sb) {
         if (sb == null) {
@@ -172,6 +183,7 @@ public class AtlasEntityHeader extends AtlasStruct 
implements Serializable {
         sb.append("classifications=[");
         AtlasBaseTypeDef.dumpObjects(classifications, sb);
         sb.append("], ");
+        sb.append("isIncomplete=").append(isIncomplete);
         super.toString(sb);
         sb.append('}');
 
@@ -190,12 +202,13 @@ public class AtlasEntityHeader extends AtlasStruct 
implements Serializable {
                        Objects.equals(classificationNames, 
that.classificationNames) &&
                        Objects.equals(meaningNames, that.classificationNames) 
&&
                        Objects.equals(classifications, that.classifications) &&
+                       Objects.equals(isIncomplete, that.isIncomplete) &&
                        Objects.equals(meanings, that.meanings);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(super.hashCode(), guid, status, displayText, 
classificationNames, classifications, meaningNames, meanings);
+        return Objects.hash(super.hashCode(), guid, status, displayText, 
classificationNames, classifications, meaningNames, meanings, isIncomplete);
     }
 
     @Override
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index a1776c8..981cfc7 100755
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -323,6 +323,7 @@ public class GraphBackedSearchIndexer implements 
SearchIndexer, ActiveStateChang
             createCommonVertexIndex(management, 
PROPAGATED_CLASSIFICATION_NAMES_KEY, UniqueKind.NONE, String.class, SINGLE, 
true, false);
             createCommonVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY, 
UniqueKind.NONE, String.class, SET, true, true);
             createCommonVertexIndex(management, 
PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, LIST, true, 
true);
+            createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY, 
UniqueKind.NONE, Integer.class, SINGLE, true, true);
 
             createCommonVertexIndex(management, PATCH_ID_PROPERTY_KEY, 
UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
             createCommonVertexIndex(management, 
PATCH_DESCRIPTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true, 
false);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index a58c670..ebf5498 100755
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -78,28 +78,8 @@ import java.util.UUID;
 
 import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
 import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
-import static 
org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
-import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
-import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS;
-import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
-import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
-
-import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_NAME_DELIMITER;
-import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_NAME_KEY;
-import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
-import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY;
-import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
-import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
-import static 
org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
-import static 
org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY;
-import static 
org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
-import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL;
-import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
+
+import static org.apache.atlas.repository.Constants.*;
 import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
 import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
 import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
@@ -1078,6 +1058,13 @@ public final class GraphHelper {
         return element.getProperty(Constants.IS_PROXY_KEY, Boolean.class);
     }
 
+    public static Boolean isEntityIncomplete(AtlasElement element) {
+        Integer value = 
element.getProperty(Constants.IS_INCOMPLETE_PROPERTY_KEY, Integer.class);
+        Boolean ret   = (value != null && value == INCOMPLETE_ENTITY_VALUE) ? 
Boolean.TRUE : Boolean.FALSE;
+
+        return ret;
+    }
+
     public static Integer getProvenanceType(AtlasElement element) {
         return element.getProperty(Constants.PROVENANCE_TYPE_KEY, 
Integer.class);
     }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
index 44df351..5de2d7c 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
@@ -56,10 +56,12 @@ public class AtlasEntityGraphDiscoveryV2 implements 
EntityGraphDiscovery {
 
     private final AtlasTypeRegistry           typeRegistry;
     private final EntityGraphDiscoveryContext discoveryContext;
+    private final EntityGraphMapper           entityGraphMapper;
 
-    public AtlasEntityGraphDiscoveryV2(AtlasTypeRegistry typeRegistry, 
EntityStream entityStream) {
-        this.typeRegistry     = typeRegistry;
-        this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry, 
entityStream);
+    public AtlasEntityGraphDiscoveryV2(AtlasTypeRegistry typeRegistry, 
EntityStream entityStream, EntityGraphMapper entityGraphMapper) {
+        this.typeRegistry      = typeRegistry;
+        this.discoveryContext  = new EntityGraphDiscoveryContext(typeRegistry, 
entityStream);
+        this.entityGraphMapper = entityGraphMapper;
     }
 
     @Override
@@ -177,8 +179,8 @@ public class AtlasEntityGraphDiscoveryV2 implements 
EntityGraphDiscovery {
     protected void resolveReferences() throws AtlasBaseException {
         MetricRecorder metric = 
RequestContext.get().startMetricRecord("resolveReferences");
 
-        EntityResolver[] entityResolvers = new EntityResolver[] { new 
IDBasedEntityResolver(typeRegistry),
-                                                                  new 
UniqAttrBasedEntityResolver(typeRegistry)
+        EntityResolver[] entityResolvers = new EntityResolver[] { new 
IDBasedEntityResolver(typeRegistry, entityGraphMapper),
+                                                                  new 
UniqAttrBasedEntityResolver(typeRegistry, entityGraphMapper)
                                                                 };
 
         for (EntityResolver resolver : entityResolvers) {
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index c495dcc..c697026 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -55,8 +55,11 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.util.*;
 
+import static java.lang.Boolean.FALSE;
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
+import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
 
 
 @Component
@@ -881,7 +884,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore 
{
     private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, 
EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws 
AtlasBaseException {
         MetricRecorder metric = 
RequestContext.get().startMetricRecord("preCreateOrUpdate");
 
-        EntityGraphDiscovery        graphDiscoverer  = new 
AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream);
+        EntityGraphDiscovery        graphDiscoverer  = new 
AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream, entityGraphMapper);
         EntityGraphDiscoveryContext discoveryContext = 
graphDiscoverer.discoverEntities();
         EntityMutationContext       context          = new 
EntityMutationContext(discoveryContext);
         RequestContext              requestContext   = RequestContext.get();
@@ -903,6 +906,13 @@ public class AtlasEntityStoreV2 implements 
AtlasEntityStore {
                 if (vertex != null) {
                     if (!isPartialUpdate) {
                         graphDiscoverer.validateAndNormalize(entity);
+
+                        // change entity 'isInComplete' to 'false' during full 
update
+                        if (isEntityIncomplete(vertex)) {
+                            vertex.removeProperty(IS_INCOMPLETE_PROPERTY_KEY);
+
+                            entity.setIsIncomplete(FALSE);
+                        }
                     } else {
                         graphDiscoverer.validateAndNormalizeForUpdate(entity);
                     }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 8ddbf74..c02f809 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -45,6 +45,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
 import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
 import org.apache.atlas.type.AtlasArrayType;
 import org.apache.atlas.type.AtlasBuiltInTypes;
@@ -140,14 +141,49 @@ public class EntityGraphMapper {
         return createVertexWithGuid(entity, guid);
     }
 
+    public AtlasVertex createShellEntityVertex(AtlasObjectId objectId, 
EntityGraphDiscoveryContext context) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> createShellEntityVertex({})", 
objectId.getTypeName());
+        }
+
+        final String    guid       = UUID.randomUUID().toString();
+        AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(objectId.getTypeName());
+        AtlasVertex     ret        = createStructVertex(objectId);
+
+        for (String superTypeName : entityType.getAllSuperTypes()) {
+            AtlasGraphUtilsV2.addEncodedProperty(ret, 
SUPER_TYPES_PROPERTY_KEY, superTypeName);
+        }
+
+        AtlasGraphUtilsV2.setEncodedProperty(ret, GUID_PROPERTY_KEY, guid);
+        AtlasGraphUtilsV2.setEncodedProperty(ret, VERSION_PROPERTY_KEY, 
getEntityVersion(null));
+        AtlasGraphUtilsV2.setEncodedProperty(ret, IS_INCOMPLETE_PROPERTY_KEY, 
INCOMPLETE_ENTITY_VALUE);
+
+        // map unique attributes
+        Map<String, Object>   uniqueAttributes = 
objectId.getUniqueAttributes();
+        EntityMutationContext mutationContext  = new 
EntityMutationContext(context);
+
+        for (AtlasAttribute attribute : 
entityType.getUniqAttributes().values()) {
+            String attrName  = attribute.getName();
+
+            if (uniqueAttributes.containsKey(attrName)) {
+                Object attrValue = 
attribute.getAttributeType().getNormalizedValue(uniqueAttributes.get(attrName));
+
+                mapAttribute(attribute, attrValue, ret, CREATE, 
mutationContext);
+            }
+        }
+
+        GraphTransactionInterceptor.addToVertexCache(guid, ret);
+
+        return ret;
+    }
+
     public AtlasVertex createVertexWithGuid(AtlasEntity entity, String guid) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> createVertex({})", entity.getTypeName());
+            LOG.debug("==> createVertexWithGuid({})", entity.getTypeName());
         }
 
         AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
-
-        AtlasVertex ret = createStructVertex(entity);
+        AtlasVertex     ret        = createStructVertex(entity);
 
         for (String superTypeName : entityType.getAllSuperTypes()) {
             AtlasGraphUtilsV2.addEncodedProperty(ret, 
SUPER_TYPES_PROPERTY_KEY, superTypeName);
@@ -271,13 +307,21 @@ public class EntityGraphMapper {
     }
 
     private AtlasVertex createStructVertex(AtlasStruct struct) {
+        return createStructVertex(struct.getTypeName());
+    }
+
+    private AtlasVertex createStructVertex(AtlasObjectId objectId) {
+        return createStructVertex(objectId.getTypeName());
+    }
+
+    private AtlasVertex createStructVertex(String typeName) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("==> createStructVertex({})", struct.getTypeName());
+            LOG.debug("==> createStructVertex({})", typeName);
         }
 
         final AtlasVertex ret = graph.addVertex();
 
-        AtlasGraphUtilsV2.setEncodedProperty(ret, ENTITY_TYPE_PROPERTY_KEY, 
struct.getTypeName());
+        AtlasGraphUtilsV2.setEncodedProperty(ret, ENTITY_TYPE_PROPERTY_KEY, 
typeName);
         AtlasGraphUtilsV2.setEncodedProperty(ret, STATE_PROPERTY_KEY, 
AtlasEntity.Status.ACTIVE.name());
         AtlasGraphUtilsV2.setEncodedProperty(ret, TIMESTAMP_PROPERTY_KEY, 
RequestContext.get().getRequestTime());
         AtlasGraphUtilsV2.setEncodedProperty(ret, 
MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
@@ -285,7 +329,7 @@ public class EntityGraphMapper {
         AtlasGraphUtilsV2.setEncodedProperty(ret, MODIFIED_BY_KEY, 
RequestContext.get().getUser());
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("<== createStructVertex({})", struct.getTypeName());
+            LOG.debug("<== createStructVertex({})", typeName);
         }
 
         return ret;
@@ -1437,6 +1481,7 @@ public class EntityGraphMapper {
 
         header.setGuid(getIdFromVertex(vertex));
         header.setStatus(entity.getStatus());
+        header.setIsIncomplete(entity.getIsIncomplete());
 
         for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
             header.setAttribute(attribute.getName(), 
entity.getAttribute(attribute.getName()));
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 02638c3..c921130 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -101,23 +101,7 @@ import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
 import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
 import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
 import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges;
-import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getArrayElementsProperty;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityStatus;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
-import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
-import static org.apache.atlas.repository.graph.GraphHelper.getPrimitiveMap;
-import static org.apache.atlas.repository.graph.GraphHelper.getReferenceMap;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
-import static 
org.apache.atlas.repository.graph.GraphHelper.getRemovePropagations;
-import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
-import static 
org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
+import static org.apache.atlas.repository.graph.GraphHelper.*;
 import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
 import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
 import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
@@ -508,13 +492,15 @@ public class EntityGraphRetriever {
     private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex 
entityVertex, Set<String> attributes) throws AtlasBaseException {
         AtlasEntityHeader ret = new AtlasEntityHeader();
 
-        String typeName = 
entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
-        String guid     = 
entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
+        String  typeName     = 
entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
+        String  guid         = 
entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
+        Boolean isIncomplete = isEntityIncomplete(entityVertex);
 
         ret.setTypeName(typeName);
         ret.setGuid(guid);
         ret.setStatus(GraphHelper.getStatus(entityVertex));
         ret.setClassificationNames(getAllTraitNames(entityVertex));
+        ret.setIsIncomplete(isIncomplete);
 
         List<AtlasTermAssignmentHeader> termAssignmentHeaders = 
mapAssignedTerms(entityVertex);
         ret.setMeanings(termAssignmentHeaders);
@@ -593,6 +579,7 @@ public class EntityGraphRetriever {
         entity.setHomeId(GraphHelper.getHomeId(entityVertex));
 
         entity.setIsProxy(GraphHelper.isProxy(entityVertex));
+        entity.setIsIncomplete(isEntityIncomplete(entityVertex));
 
         entity.setProvenanceType(GraphHelper.getProvenanceType(entityVertex));
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java
index fe76b3a..3b96948 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java
@@ -36,9 +36,11 @@ public class IDBasedEntityResolver implements EntityResolver 
{
     private static final Logger LOG = 
LoggerFactory.getLogger(IDBasedEntityResolver.class);
 
     private final AtlasTypeRegistry typeRegistry;
+    private final EntityGraphMapper entityGraphMapper;
 
-    public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
-        this.typeRegistry = typeRegistry;
+    public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry, 
EntityGraphMapper entityGraphMapper) {
+        this.typeRegistry      = typeRegistry;
+        this.entityGraphMapper = entityGraphMapper;
     }
 
     public EntityGraphDiscoveryContext 
resolveEntityReferences(EntityGraphDiscoveryContext context) throws 
AtlasBaseException {
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqAttrBasedEntityResolver.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqAttrBasedEntityResolver.java
index d400a1d..14ddc5d 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqAttrBasedEntityResolver.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqAttrBasedEntityResolver.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.store.graph.v2;
 
 import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasObjectId;
@@ -32,14 +33,15 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-
 public class UniqAttrBasedEntityResolver implements EntityResolver {
     private static final Logger LOG = 
LoggerFactory.getLogger(UniqAttrBasedEntityResolver.class);
 
     private final AtlasTypeRegistry typeRegistry;
+    private final EntityGraphMapper entityGraphMapper;
 
-    public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
-        this.typeRegistry = typeRegistry;
+    public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry, 
EntityGraphMapper entityGraphMapper) {
+        this.typeRegistry      = typeRegistry;
+        this.entityGraphMapper = entityGraphMapper;
     }
 
     @Override
@@ -61,6 +63,10 @@ public class UniqAttrBasedEntityResolver implements 
EntityResolver {
 
             AtlasVertex vertex = 
AtlasGraphUtilsV2.findByUniqueAttributes(entityType, 
objId.getUniqueAttributes());
 
+            if (vertex == null && 
RequestContext.get().isCreateShellEntityForNonExistingReference()) {
+                vertex = entityGraphMapper.createShellEntityVertex(objId, 
context);
+            }
+
             if (vertex != null) {
                 context.addResolvedIdByUniqAttribs(objId, vertex);
                 resolvedReferences.add(objId);
@@ -71,5 +77,4 @@ public class UniqAttrBasedEntityResolver implements 
EntityResolver {
 
         return context;
     }
-}
-
+}
\ No newline at end of file
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java 
b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index d518609..0c3ba08 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -57,7 +57,7 @@ public class RequestContext {
     private boolean     isImportInProgress = false;
     private boolean     isInNotificationProcessing = false;
     private boolean     isInTypePatching           = false;
-
+    private boolean     createShellEntityForNonExistingReference = false;
 
     private RequestContext() {
     }
@@ -182,6 +182,13 @@ public class RequestContext {
         isInTypePatching = inTypePatching;
     }
 
+    public boolean isCreateShellEntityForNonExistingReference() {
+        return createShellEntityForNonExistingReference;
+    }
+
+    public void setCreateShellEntityForNonExistingReference(boolean 
createShellEntityForNonExistingReference) {
+        this.createShellEntityForNonExistingReference = 
createShellEntityForNonExistingReference;
+    }
 
     public void recordEntityUpdate(AtlasEntityHeader entity) {
         if (entity != null && entity.getGuid() != null) {
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index a844bcd..b2211fc 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -177,6 +177,8 @@ public class EntityNotificationListenerV2 implements 
EntityChangeListenerV2 {
 
         ret.setGuid(entity.getGuid());
         ret.setStatus(entity.getStatus());
+        ret.setIsIncomplete(entity.getIsIncomplete());
+
         setAttribute(ret, NAME, name);
         setAttribute(ret, DESCRIPTION, entity.getAttribute(DESCRIPTION));
         setAttribute(ret, OWNER, entity.getAttribute(OWNER));
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index f7df6b3..41a6c2e 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -20,12 +20,7 @@ package org.apache.atlas.notification;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import kafka.utils.ShutdownableThread;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasClientV2;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.RequestContext;
+import org.apache.atlas.*;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
@@ -171,6 +166,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final boolean                       hiveTypesRemoveOwnedRefAttrs;
     private final boolean                       rdbmsTypesRemoveOwnedRefAttrs;
     private final boolean                       preprocessEnabled;
+    private final boolean createShellEntityForNonExistingReference;
     private final NotificationInterface         notificationInterface;
     private final Configuration                 applicationProperties;
     private       ExecutorService               executors;
@@ -205,6 +201,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
         consumerDisabled                              = 
applicationProperties.getBoolean(CONSUMER_DISABLED, false);
         largeMessageProcessingTimeThresholdMs         = 
applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms",
 60 * 1000);  //  60 sec by default
+        createShellEntityForNonExistingReference      = 
AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
 
         String[] patternHiveTablesToIgnore = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
         String[] patternHiveTablesToPrune  = 
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
@@ -586,6 +583,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
                         requestContext.setUser(messageUser, null);
                         requestContext.setInNotificationProcessing(true);
+                        
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
 
                         switch (message.getType()) {
                             case ENTITY_CREATE: {
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java 
b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index 54f7330..e9c44b3 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -45,7 +45,7 @@ import java.util.Date;
 import java.util.Set;
 import java.util.UUID;
 
-import static 
org.apache.atlas.AtlasConfiguration.REST_API_ENABLE_DELETE_TYPE_OVERRIDE;
+import static org.apache.atlas.AtlasConfiguration.*;
 
 /**
  * This records audit information as part of the filter after processing the 
request
@@ -56,13 +56,15 @@ public class AuditFilter implements Filter {
     private static final Logger LOG       = 
LoggerFactory.getLogger(AuditFilter.class);
     private static final Logger AUDIT_LOG = LoggerFactory.getLogger("AUDIT");
 
-    private boolean deleteTypeOverrideEnabled = false;
+    private boolean deleteTypeOverrideEnabled                = false;
+    private boolean createShellEntityForNonExistingReference = false;
 
     @Override
     public void init(FilterConfig filterConfig) throws ServletException {
         LOG.info("AuditFilter initialization started");
 
-        deleteTypeOverrideEnabled = 
REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
+        deleteTypeOverrideEnabled                = 
REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
+        createShellEntityForNonExistingReference = 
REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
 
         LOG.info("REST_API_ENABLE_DELETE_TYPE_OVERRIDE={}", 
deleteTypeOverrideEnabled);
     }
@@ -88,6 +90,7 @@ public class AuditFilter implements Filter {
             RequestContext requestContext = RequestContext.get();
             requestContext.setUser(user, userGroups);
             
requestContext.setClientIPAddress(AtlasAuthorizationUtils.getRequestIpAddress(httpRequest));
+            
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
 
             if (StringUtils.isNotEmpty(deleteType)) {
                 if (deleteTypeOverrideEnabled) {
@@ -196,4 +199,4 @@ public class AuditFilter implements Filter {
             return sb.toString();
         }
     }
-}
+}
\ No newline at end of file

Reply via email to