This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new b24f88e ATLAS-3405: Handling of references to non-existing entities
in notifications
b24f88e is described below
commit b24f88ecd70d73a8ca8752417317537eb3207343
Author: Sarath Subramanian <[email protected]>
AuthorDate: Sun Sep 8 09:47:15 2019 -0700
ATLAS-3405: Handling of references to non-existing entities in notifications
---
.../org/apache/atlas/repository/Constants.java | 9 ++--
.../java/org/apache/atlas/AtlasConfiguration.java | 3 ++
.../apache/atlas/model/instance/AtlasEntity.java | 45 ++++++++++++-----
.../atlas/model/instance/AtlasEntityHeader.java | 15 +++++-
.../repository/graph/GraphBackedSearchIndexer.java | 1 +
.../apache/atlas/repository/graph/GraphHelper.java | 31 ++++--------
.../graph/v2/AtlasEntityGraphDiscoveryV2.java | 12 +++--
.../store/graph/v2/AtlasEntityStoreV2.java | 12 ++++-
.../store/graph/v2/EntityGraphMapper.java | 57 +++++++++++++++++++---
.../store/graph/v2/EntityGraphRetriever.java | 25 +++-------
.../store/graph/v2/IDBasedEntityResolver.java | 6 ++-
.../graph/v2/UniqAttrBasedEntityResolver.java | 15 ++++--
.../main/java/org/apache/atlas/RequestContext.java | 9 +++-
.../notification/EntityNotificationListenerV2.java | 2 +
.../notification/NotificationHookConsumer.java | 10 ++--
.../org/apache/atlas/web/filters/AuditFilter.java | 11 +++--
16 files changed, 176 insertions(+), 87 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java
b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 673a85c..2f08efc 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -135,6 +135,8 @@ public final class Constants {
public static final String MODIFICATION_TIMESTAMP_PROPERTY_KEY =
encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "modificationTimestamp");
+ public static final String IS_INCOMPLETE_PROPERTY_KEY =
encodePropertyKey(INTERNAL_PROPERTY_KEY_PREFIX + "isIncomplete");
+
/**
* search backing index name.
*/
@@ -192,9 +194,10 @@ public final class Constants {
* replication attributes
*/
- public static final String ATTR_NAME_REFERENCEABLE = "Referenceable.";
- public static final String ATTR_NAME_REPLICATED_TO = "replicatedTo";
- public static final String ATTR_NAME_REPLICATED_FROM = "replicatedFrom";
+ public static final String ATTR_NAME_REFERENCEABLE = "Referenceable.";
+ public static final String ATTR_NAME_REPLICATED_TO = "replicatedTo";
+ public static final String ATTR_NAME_REPLICATED_FROM = "replicatedFrom";
+ public static final Integer INCOMPLETE_ENTITY_VALUE = Integer.valueOf(1);
private Constants() {
}
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 9da51f5..7b71e51 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -48,6 +48,9 @@ public enum AtlasConfiguration {
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds",
15 * 60),
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds",
5 * 60),
+
NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.notification.consumer.create.shell.entity.for.non-existing.ref",
true),
+
REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.rest.create.shell.entity.for.non-existing.ref",
false),
+
GRAPHSTORE_INDEXED_STRING_SAFE_LENGTH("atlas.graphstore.indexed.string.safe.length",
Short.MAX_VALUE), // based on
org.apache.hadoop.hbase.client.Mutation.checkRow()
//search configuration
diff --git
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
index 659f7d5..67493ba 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java
@@ -59,16 +59,17 @@ import static
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
public class AtlasEntity extends AtlasStruct implements Serializable {
private static final long serialVersionUID = 1L;
- public static final String KEY_GUID = "guid";
- public static final String KEY_HOME_ID = "homeId";
- public static final String KEY_IS_PROXY = "isProxy";
- public static final String KEY_PROVENANCE_TYPE = "provenanceType";
- public static final String KEY_STATUS = "status";
- public static final String KEY_CREATED_BY = "createdBy";
- public static final String KEY_UPDATED_BY = "updatedBy";
- public static final String KEY_CREATE_TIME = "createTime";
- public static final String KEY_UPDATE_TIME = "updateTime";
- public static final String KEY_VERSION = "version";
+ public static final String KEY_GUID = "guid";
+ public static final String KEY_HOME_ID = "homeId";
+ public static final String KEY_IS_PROXY = "isProxy";
+ public static final String KEY_IS_INCOMPLETE = "isIncomplete";
+ public static final String KEY_PROVENANCE_TYPE = "provenanceType";
+ public static final String KEY_STATUS = "status";
+ public static final String KEY_CREATED_BY = "createdBy";
+ public static final String KEY_UPDATED_BY = "updatedBy";
+ public static final String KEY_CREATE_TIME = "createTime";
+ public static final String KEY_UPDATE_TIME = "updateTime";
+ public static final String KEY_VERSION = "version";
/**
* Status of the entity - can be active or deleted. Deleted entities are
not removed from Atlas store.
@@ -78,6 +79,7 @@ public class AtlasEntity extends AtlasStruct implements
Serializable {
private String guid = null;
private String homeId = null;
private Boolean isProxy = Boolean.FALSE;
+ private Boolean isIncomplete = Boolean.FALSE;
private Integer provenanceType = 0;
private Status status = Status.ACTIVE;
private String createdBy = null;
@@ -133,6 +135,7 @@ public class AtlasEntity extends AtlasStruct implements
Serializable {
Object oGuid = map.get(KEY_GUID);
Object homeId = map.get(KEY_HOME_ID);
Object isProxy = map.get(KEY_IS_PROXY);
+ Object isIncomplete = map.get(KEY_IS_INCOMPLETE);
Object provenanceType = map.get(KEY_PROVENANCE_TYPE);
Object status = map.get(KEY_STATUS);
Object createdBy = map.get(KEY_CREATED_BY);
@@ -156,6 +159,12 @@ public class AtlasEntity extends AtlasStruct implements
Serializable {
setIsProxy(Boolean.FALSE);
}
+ if (isIncomplete != null) {
+ setIsIncomplete((Boolean) isIncomplete);
+ } else {
+ setIsIncomplete(Boolean.FALSE);
+ }
+
if (provenanceType instanceof Number) {
setProvenanceType(((Number) version).intValue());
}
@@ -193,6 +202,7 @@ public class AtlasEntity extends AtlasStruct implements
Serializable {
setGuid(other.getGuid());
setHomeId(other.getHomeId());
setIsProxy(other.isProxy());
+ setIsIncomplete(other.getIsIncomplete());
setProvenanceType(other.getProvenanceType());
setStatus(other.getStatus());
setCreatedBy(other.getCreatedBy());
@@ -230,6 +240,14 @@ public class AtlasEntity extends AtlasStruct implements
Serializable {
this.isProxy = isProxy;
}
+ public Boolean getIsIncomplete() {
+ return isIncomplete;
+ }
+
+ public void setIsIncomplete(Boolean isIncomplete) {
+ this.isIncomplete = isIncomplete;
+ }
+
public Integer getProvenanceType() {
return provenanceType;
}
@@ -355,6 +373,7 @@ public class AtlasEntity extends AtlasStruct implements
Serializable {
setGuid(nextInternalId());
setHomeId(null);
setIsProxy(Boolean.FALSE);
+ setIsIncomplete(Boolean.FALSE);
setProvenanceType(0);
setStatus(null);
setCreatedBy(null);
@@ -380,6 +399,7 @@ public class AtlasEntity extends AtlasStruct implements
Serializable {
sb.append("guid='").append(guid).append('\'');
sb.append(", homeId='").append(homeId).append('\'');
sb.append(", isProxy='").append(isProxy).append('\'');
+ sb.append(", isIncomplete=").append(isIncomplete);
sb.append(", provenanceType=").append(provenanceType);
sb.append(", status=").append(status);
sb.append(", createdBy='").append(createdBy).append('\'');
@@ -411,6 +431,7 @@ public class AtlasEntity extends AtlasStruct implements
Serializable {
return Objects.equals(guid, that.guid) &&
Objects.equals(homeId, that.homeId) &&
Objects.equals(isProxy, that.isProxy) &&
+ Objects.equals(isIncomplete, that.isIncomplete) &&
Objects.equals(provenanceType, that.provenanceType) &&
status == that.status &&
Objects.equals(createdBy, that.createdBy) &&
@@ -424,8 +445,8 @@ public class AtlasEntity extends AtlasStruct implements
Serializable {
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), guid, homeId, isProxy,
provenanceType, status, createdBy, updatedBy, createTime, updateTime, version,
- relationshipAttributes, classifications);
+ return Objects.hash(super.hashCode(), guid, homeId, isProxy,
isIncomplete, provenanceType, status,
+ createdBy, updatedBy, createTime, updateTime, version,
relationshipAttributes, classifications);
}
@Override
diff --git
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
index 26687bf..3aef6cd 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeader.java
@@ -60,6 +60,7 @@ public class AtlasEntityHeader extends AtlasStruct implements
Serializable {
private List<AtlasClassification> classifications = null;
private List<String> meaningNames = null;
private List<AtlasTermAssignmentHeader> meanings = null;
+ private Boolean isIncomplete =
Boolean.FALSE;
public AtlasEntityHeader() {
this(null, null);
@@ -98,6 +99,7 @@ public class AtlasEntityHeader extends AtlasStruct implements
Serializable {
setDisplayText(other.getDisplayText());
setClassificationNames(other.getClassificationNames());
setClassifications(other.getClassifications());
+ setIsIncomplete(other.getIsIncomplete());
}
}
@@ -106,6 +108,7 @@ public class AtlasEntityHeader extends AtlasStruct
implements Serializable {
setGuid(entity.getGuid());
setStatus(entity.getStatus());
setClassifications(entity.getClassifications());
+ setIsIncomplete(entity.getIsIncomplete());
if (CollectionUtils.isNotEmpty(entity.getClassifications())) {
this.classificationNames = new
ArrayList<>(entity.getClassifications().size());
@@ -156,6 +159,14 @@ public class AtlasEntityHeader extends AtlasStruct
implements Serializable {
this.classifications = classifications;
}
+ public Boolean getIsIncomplete() {
+ return isIncomplete;
+ }
+
+ public void setIsIncomplete(Boolean isIncomplete) {
+ this.isIncomplete = isIncomplete;
+ }
+
@Override
public StringBuilder toString(StringBuilder sb) {
if (sb == null) {
@@ -172,6 +183,7 @@ public class AtlasEntityHeader extends AtlasStruct
implements Serializable {
sb.append("classifications=[");
AtlasBaseTypeDef.dumpObjects(classifications, sb);
sb.append("], ");
+ sb.append("isIncomplete=").append(isIncomplete);
super.toString(sb);
sb.append('}');
@@ -190,12 +202,13 @@ public class AtlasEntityHeader extends AtlasStruct
implements Serializable {
Objects.equals(classificationNames,
that.classificationNames) &&
Objects.equals(meaningNames, that.classificationNames)
&&
Objects.equals(classifications, that.classifications) &&
+ Objects.equals(isIncomplete, that.isIncomplete) &&
Objects.equals(meanings, that.meanings);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), guid, status, displayText,
classificationNames, classifications, meaningNames, meanings);
+ return Objects.hash(super.hashCode(), guid, status, displayText,
classificationNames, classifications, meaningNames, meanings, isIncomplete);
}
@Override
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index a1776c8..981cfc7 100755
---
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -323,6 +323,7 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
createCommonVertexIndex(management,
PROPAGATED_CLASSIFICATION_NAMES_KEY, UniqueKind.NONE, String.class, SINGLE,
true, false);
createCommonVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY,
UniqueKind.NONE, String.class, SET, true, true);
createCommonVertexIndex(management,
PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, UniqueKind.NONE, String.class, LIST, true,
true);
+ createCommonVertexIndex(management, IS_INCOMPLETE_PROPERTY_KEY,
UniqueKind.NONE, Integer.class, SINGLE, true, true);
createCommonVertexIndex(management, PATCH_ID_PROPERTY_KEY,
UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false);
createCommonVertexIndex(management,
PATCH_DESCRIPTION_PROPERTY_KEY, UniqueKind.NONE, String.class, SINGLE, true,
false);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index a58c670..ebf5498 100755
---
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -78,28 +78,8 @@ import java.util.UUID;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
-import static
org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
-import static
org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
-import static
org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS;
-import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
-import static
org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_NAME_PROPERTY_KEY;
-
-import static
org.apache.atlas.repository.Constants.CLASSIFICATION_NAME_DELIMITER;
-import static
org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_NAME_KEY;
-import static
org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
-import static
org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY;
-import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
-import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
-import static
org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
-import static
org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_BLOCKED_PROPAGATED_CLASSIFICATIONS_KEY;
-import static
org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY;
-import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
-import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL;
-import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
+
+import static org.apache.atlas.repository.Constants.*;
import static
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.BOTH;
import static
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
@@ -1078,6 +1058,13 @@ public final class GraphHelper {
return element.getProperty(Constants.IS_PROXY_KEY, Boolean.class);
}
+ public static Boolean isEntityIncomplete(AtlasElement element) {
+ Integer value =
element.getProperty(Constants.IS_INCOMPLETE_PROPERTY_KEY, Integer.class);
+ Boolean ret = (value != null && value == INCOMPLETE_ENTITY_VALUE) ?
Boolean.TRUE : Boolean.FALSE;
+
+ return ret;
+ }
+
public static Integer getProvenanceType(AtlasElement element) {
return element.getProperty(Constants.PROVENANCE_TYPE_KEY,
Integer.class);
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
index 44df351..5de2d7c 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityGraphDiscoveryV2.java
@@ -56,10 +56,12 @@ public class AtlasEntityGraphDiscoveryV2 implements
EntityGraphDiscovery {
private final AtlasTypeRegistry typeRegistry;
private final EntityGraphDiscoveryContext discoveryContext;
+ private final EntityGraphMapper entityGraphMapper;
- public AtlasEntityGraphDiscoveryV2(AtlasTypeRegistry typeRegistry,
EntityStream entityStream) {
- this.typeRegistry = typeRegistry;
- this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry,
entityStream);
+ public AtlasEntityGraphDiscoveryV2(AtlasTypeRegistry typeRegistry,
EntityStream entityStream, EntityGraphMapper entityGraphMapper) {
+ this.typeRegistry = typeRegistry;
+ this.discoveryContext = new EntityGraphDiscoveryContext(typeRegistry,
entityStream);
+ this.entityGraphMapper = entityGraphMapper;
}
@Override
@@ -177,8 +179,8 @@ public class AtlasEntityGraphDiscoveryV2 implements
EntityGraphDiscovery {
protected void resolveReferences() throws AtlasBaseException {
MetricRecorder metric =
RequestContext.get().startMetricRecord("resolveReferences");
- EntityResolver[] entityResolvers = new EntityResolver[] { new
IDBasedEntityResolver(typeRegistry),
- new
UniqAttrBasedEntityResolver(typeRegistry)
+ EntityResolver[] entityResolvers = new EntityResolver[] { new
IDBasedEntityResolver(typeRegistry, entityGraphMapper),
+ new
UniqAttrBasedEntityResolver(typeRegistry, entityGraphMapper)
};
for (EntityResolver resolver : entityResolvers) {
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index c495dcc..c697026 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -55,8 +55,11 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.util.*;
+import static java.lang.Boolean.FALSE;
import static
org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
import static
org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
+import static org.apache.atlas.repository.Constants.IS_INCOMPLETE_PROPERTY_KEY;
+import static org.apache.atlas.repository.graph.GraphHelper.isEntityIncomplete;
@Component
@@ -881,7 +884,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore
{
private EntityMutationContext preCreateOrUpdate(EntityStream entityStream,
EntityGraphMapper entityGraphMapper, boolean isPartialUpdate) throws
AtlasBaseException {
MetricRecorder metric =
RequestContext.get().startMetricRecord("preCreateOrUpdate");
- EntityGraphDiscovery graphDiscoverer = new
AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream);
+ EntityGraphDiscovery graphDiscoverer = new
AtlasEntityGraphDiscoveryV2(typeRegistry, entityStream, entityGraphMapper);
EntityGraphDiscoveryContext discoveryContext =
graphDiscoverer.discoverEntities();
EntityMutationContext context = new
EntityMutationContext(discoveryContext);
RequestContext requestContext = RequestContext.get();
@@ -903,6 +906,13 @@ public class AtlasEntityStoreV2 implements
AtlasEntityStore {
if (vertex != null) {
if (!isPartialUpdate) {
graphDiscoverer.validateAndNormalize(entity);
+
+ // change entity 'isInComplete' to 'false' during full
update
+ if (isEntityIncomplete(vertex)) {
+ vertex.removeProperty(IS_INCOMPLETE_PROPERTY_KEY);
+
+ entity.setIsIncomplete(FALSE);
+ }
} else {
graphDiscoverer.validateAndNormalizeForUpdate(entity);
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index 8ddbf74..c02f809 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -45,6 +45,7 @@ import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.repository.store.graph.AtlasRelationshipStore;
+import org.apache.atlas.repository.store.graph.EntityGraphDiscoveryContext;
import org.apache.atlas.repository.store.graph.v1.DeleteHandlerDelegate;
import org.apache.atlas.type.AtlasArrayType;
import org.apache.atlas.type.AtlasBuiltInTypes;
@@ -140,14 +141,49 @@ public class EntityGraphMapper {
return createVertexWithGuid(entity, guid);
}
+ public AtlasVertex createShellEntityVertex(AtlasObjectId objectId,
EntityGraphDiscoveryContext context) throws AtlasBaseException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==> createShellEntityVertex({})",
objectId.getTypeName());
+ }
+
+ final String guid = UUID.randomUUID().toString();
+ AtlasEntityType entityType =
typeRegistry.getEntityTypeByName(objectId.getTypeName());
+ AtlasVertex ret = createStructVertex(objectId);
+
+ for (String superTypeName : entityType.getAllSuperTypes()) {
+ AtlasGraphUtilsV2.addEncodedProperty(ret,
SUPER_TYPES_PROPERTY_KEY, superTypeName);
+ }
+
+ AtlasGraphUtilsV2.setEncodedProperty(ret, GUID_PROPERTY_KEY, guid);
+ AtlasGraphUtilsV2.setEncodedProperty(ret, VERSION_PROPERTY_KEY,
getEntityVersion(null));
+ AtlasGraphUtilsV2.setEncodedProperty(ret, IS_INCOMPLETE_PROPERTY_KEY,
INCOMPLETE_ENTITY_VALUE);
+
+ // map unique attributes
+ Map<String, Object> uniqueAttributes =
objectId.getUniqueAttributes();
+ EntityMutationContext mutationContext = new
EntityMutationContext(context);
+
+ for (AtlasAttribute attribute :
entityType.getUniqAttributes().values()) {
+ String attrName = attribute.getName();
+
+ if (uniqueAttributes.containsKey(attrName)) {
+ Object attrValue =
attribute.getAttributeType().getNormalizedValue(uniqueAttributes.get(attrName));
+
+ mapAttribute(attribute, attrValue, ret, CREATE,
mutationContext);
+ }
+ }
+
+ GraphTransactionInterceptor.addToVertexCache(guid, ret);
+
+ return ret;
+ }
+
public AtlasVertex createVertexWithGuid(AtlasEntity entity, String guid) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> createVertex({})", entity.getTypeName());
+ LOG.debug("==> createVertexWithGuid({})", entity.getTypeName());
}
AtlasEntityType entityType =
typeRegistry.getEntityTypeByName(entity.getTypeName());
-
- AtlasVertex ret = createStructVertex(entity);
+ AtlasVertex ret = createStructVertex(entity);
for (String superTypeName : entityType.getAllSuperTypes()) {
AtlasGraphUtilsV2.addEncodedProperty(ret,
SUPER_TYPES_PROPERTY_KEY, superTypeName);
@@ -271,13 +307,21 @@ public class EntityGraphMapper {
}
private AtlasVertex createStructVertex(AtlasStruct struct) {
+ return createStructVertex(struct.getTypeName());
+ }
+
+ private AtlasVertex createStructVertex(AtlasObjectId objectId) {
+ return createStructVertex(objectId.getTypeName());
+ }
+
+ private AtlasVertex createStructVertex(String typeName) {
if (LOG.isDebugEnabled()) {
- LOG.debug("==> createStructVertex({})", struct.getTypeName());
+ LOG.debug("==> createStructVertex({})", typeName);
}
final AtlasVertex ret = graph.addVertex();
- AtlasGraphUtilsV2.setEncodedProperty(ret, ENTITY_TYPE_PROPERTY_KEY,
struct.getTypeName());
+ AtlasGraphUtilsV2.setEncodedProperty(ret, ENTITY_TYPE_PROPERTY_KEY,
typeName);
AtlasGraphUtilsV2.setEncodedProperty(ret, STATE_PROPERTY_KEY,
AtlasEntity.Status.ACTIVE.name());
AtlasGraphUtilsV2.setEncodedProperty(ret, TIMESTAMP_PROPERTY_KEY,
RequestContext.get().getRequestTime());
AtlasGraphUtilsV2.setEncodedProperty(ret,
MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime());
@@ -285,7 +329,7 @@ public class EntityGraphMapper {
AtlasGraphUtilsV2.setEncodedProperty(ret, MODIFIED_BY_KEY,
RequestContext.get().getUser());
if (LOG.isDebugEnabled()) {
- LOG.debug("<== createStructVertex({})", struct.getTypeName());
+ LOG.debug("<== createStructVertex({})", typeName);
}
return ret;
@@ -1437,6 +1481,7 @@ public class EntityGraphMapper {
header.setGuid(getIdFromVertex(vertex));
header.setStatus(entity.getStatus());
+ header.setIsIncomplete(entity.getIsIncomplete());
for (AtlasAttribute attribute : type.getUniqAttributes().values()) {
header.setAttribute(attribute.getName(),
entity.getAttribute(attribute.getName()));
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 02638c3..c921130 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -101,23 +101,7 @@ import static
org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
import static
org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
import static org.apache.atlas.repository.Constants.TERM_ASSIGNMENT_LABEL;
-import static
org.apache.atlas.repository.graph.GraphHelper.getAdjacentEdgesByLabel;
-import static
org.apache.atlas.repository.graph.GraphHelper.getAllClassificationEdges;
-import static org.apache.atlas.repository.graph.GraphHelper.getAllTraitNames;
-import static
org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds;
-import static
org.apache.atlas.repository.graph.GraphHelper.getArrayElementsProperty;
-import static
org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityStatus;
-import static
org.apache.atlas.repository.graph.GraphHelper.getClassificationVertices;
-import static org.apache.atlas.repository.graph.GraphHelper.getGuid;
-import static
org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel;
-import static org.apache.atlas.repository.graph.GraphHelper.getPrimitiveMap;
-import static org.apache.atlas.repository.graph.GraphHelper.getReferenceMap;
-import static
org.apache.atlas.repository.graph.GraphHelper.getOutGoingEdgesByLabel;
-import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags;
-import static
org.apache.atlas.repository.graph.GraphHelper.getRelationshipGuid;
-import static
org.apache.atlas.repository.graph.GraphHelper.getRemovePropagations;
-import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
-import static
org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
+import static org.apache.atlas.repository.graph.GraphHelper.*;
import static
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
import static
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
import static
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection;
@@ -508,13 +492,15 @@ public class EntityGraphRetriever {
private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex
entityVertex, Set<String> attributes) throws AtlasBaseException {
AtlasEntityHeader ret = new AtlasEntityHeader();
- String typeName =
entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
- String guid =
entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
+ String typeName =
entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class);
+ String guid =
entityVertex.getProperty(Constants.GUID_PROPERTY_KEY, String.class);
+ Boolean isIncomplete = isEntityIncomplete(entityVertex);
ret.setTypeName(typeName);
ret.setGuid(guid);
ret.setStatus(GraphHelper.getStatus(entityVertex));
ret.setClassificationNames(getAllTraitNames(entityVertex));
+ ret.setIsIncomplete(isIncomplete);
List<AtlasTermAssignmentHeader> termAssignmentHeaders =
mapAssignedTerms(entityVertex);
ret.setMeanings(termAssignmentHeaders);
@@ -593,6 +579,7 @@ public class EntityGraphRetriever {
entity.setHomeId(GraphHelper.getHomeId(entityVertex));
entity.setIsProxy(GraphHelper.isProxy(entityVertex));
+ entity.setIsIncomplete(isEntityIncomplete(entityVertex));
entity.setProvenanceType(GraphHelper.getProvenanceType(entityVertex));
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java
index fe76b3a..3b96948 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/IDBasedEntityResolver.java
@@ -36,9 +36,11 @@ public class IDBasedEntityResolver implements EntityResolver
{
private static final Logger LOG =
LoggerFactory.getLogger(IDBasedEntityResolver.class);
private final AtlasTypeRegistry typeRegistry;
+ private final EntityGraphMapper entityGraphMapper;
- public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
- this.typeRegistry = typeRegistry;
+ public IDBasedEntityResolver(AtlasTypeRegistry typeRegistry,
EntityGraphMapper entityGraphMapper) {
+ this.typeRegistry = typeRegistry;
+ this.entityGraphMapper = entityGraphMapper;
}
public EntityGraphDiscoveryContext
resolveEntityReferences(EntityGraphDiscoveryContext context) throws
AtlasBaseException {
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqAttrBasedEntityResolver.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqAttrBasedEntityResolver.java
index d400a1d..14ddc5d 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqAttrBasedEntityResolver.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/UniqAttrBasedEntityResolver.java
@@ -18,6 +18,7 @@
package org.apache.atlas.repository.store.graph.v2;
import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.instance.AtlasObjectId;
@@ -32,14 +33,15 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-
public class UniqAttrBasedEntityResolver implements EntityResolver {
private static final Logger LOG =
LoggerFactory.getLogger(UniqAttrBasedEntityResolver.class);
private final AtlasTypeRegistry typeRegistry;
+ private final EntityGraphMapper entityGraphMapper;
- public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry) {
- this.typeRegistry = typeRegistry;
+ public UniqAttrBasedEntityResolver(AtlasTypeRegistry typeRegistry,
EntityGraphMapper entityGraphMapper) {
+ this.typeRegistry = typeRegistry;
+ this.entityGraphMapper = entityGraphMapper;
}
@Override
@@ -61,6 +63,10 @@ public class UniqAttrBasedEntityResolver implements
EntityResolver {
AtlasVertex vertex =
AtlasGraphUtilsV2.findByUniqueAttributes(entityType,
objId.getUniqueAttributes());
+ if (vertex == null &&
RequestContext.get().isCreateShellEntityForNonExistingReference()) {
+ vertex = entityGraphMapper.createShellEntityVertex(objId,
context);
+ }
+
if (vertex != null) {
context.addResolvedIdByUniqAttribs(objId, vertex);
resolvedReferences.add(objId);
@@ -71,5 +77,4 @@ public class UniqAttrBasedEntityResolver implements
EntityResolver {
return context;
}
-}
-
+}
\ No newline at end of file
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java
b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index d518609..0c3ba08 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -57,7 +57,7 @@ public class RequestContext {
private boolean isImportInProgress = false;
private boolean isInNotificationProcessing = false;
private boolean isInTypePatching = false;
-
+ private boolean createShellEntityForNonExistingReference = false;
private RequestContext() {
}
@@ -182,6 +182,13 @@ public class RequestContext {
isInTypePatching = inTypePatching;
}
+ public boolean isCreateShellEntityForNonExistingReference() {
+ return createShellEntityForNonExistingReference;
+ }
+
+ public void setCreateShellEntityForNonExistingReference(boolean
createShellEntityForNonExistingReference) {
+ this.createShellEntityForNonExistingReference =
createShellEntityForNonExistingReference;
+ }
public void recordEntityUpdate(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null) {
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
index a844bcd..b2211fc 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/EntityNotificationListenerV2.java
@@ -177,6 +177,8 @@ public class EntityNotificationListenerV2 implements
EntityChangeListenerV2 {
ret.setGuid(entity.getGuid());
ret.setStatus(entity.getStatus());
+ ret.setIsIncomplete(entity.getIsIncomplete());
+
setAttribute(ret, NAME, name);
setAttribute(ret, DESCRIPTION, entity.getAttribute(DESCRIPTION));
setAttribute(ret, OWNER, entity.getAttribute(OWNER));
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index f7df6b3..41a6c2e 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -20,12 +20,7 @@ package org.apache.atlas.notification;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import kafka.utils.ShutdownableThread;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasClient;
-import org.apache.atlas.AtlasClientV2;
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
-import org.apache.atlas.RequestContext;
+import org.apache.atlas.*;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
@@ -171,6 +166,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
private final boolean hiveTypesRemoveOwnedRefAttrs;
private final boolean rdbmsTypesRemoveOwnedRefAttrs;
private final boolean preprocessEnabled;
+ private final boolean createShellEntityForNonExistingReference;
private final NotificationInterface notificationInterface;
private final Configuration applicationProperties;
private ExecutorService executors;
@@ -205,6 +201,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
skipHiveColumnLineageHive20633InputsThreshold =
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
15); // skip if avg # of inputs is > 15
consumerDisabled =
applicationProperties.getBoolean(CONSUMER_DISABLED, false);
largeMessageProcessingTimeThresholdMs =
applicationProperties.getInt("atlas.notification.consumer.large.message.processing.time.threshold.ms",
60 * 1000); // 60 sec by default
+ createShellEntityForNonExistingReference =
AtlasConfiguration.NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
String[] patternHiveTablesToIgnore =
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN);
String[] patternHiveTablesToPrune =
applicationProperties.getStringArray(CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN);
@@ -586,6 +583,7 @@ public class NotificationHookConsumer implements Service,
ActiveStateChangeHandl
requestContext.setUser(messageUser, null);
requestContext.setInNotificationProcessing(true);
+
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
switch (message.getType()) {
case ENTITY_CREATE: {
diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
index 54f7330..e9c44b3 100755
--- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
+++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java
@@ -45,7 +45,7 @@ import java.util.Date;
import java.util.Set;
import java.util.UUID;
-import static
org.apache.atlas.AtlasConfiguration.REST_API_ENABLE_DELETE_TYPE_OVERRIDE;
+import static org.apache.atlas.AtlasConfiguration.*;
/**
* This records audit information as part of the filter after processing the
request
@@ -56,13 +56,15 @@ public class AuditFilter implements Filter {
private static final Logger LOG =
LoggerFactory.getLogger(AuditFilter.class);
private static final Logger AUDIT_LOG = LoggerFactory.getLogger("AUDIT");
- private boolean deleteTypeOverrideEnabled = false;
+ private boolean deleteTypeOverrideEnabled = false;
+ private boolean createShellEntityForNonExistingReference = false;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
LOG.info("AuditFilter initialization started");
- deleteTypeOverrideEnabled =
REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
+ deleteTypeOverrideEnabled =
REST_API_ENABLE_DELETE_TYPE_OVERRIDE.getBoolean();
+ createShellEntityForNonExistingReference =
REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF.getBoolean();
LOG.info("REST_API_ENABLE_DELETE_TYPE_OVERRIDE={}",
deleteTypeOverrideEnabled);
}
@@ -88,6 +90,7 @@ public class AuditFilter implements Filter {
RequestContext requestContext = RequestContext.get();
requestContext.setUser(user, userGroups);
requestContext.setClientIPAddress(AtlasAuthorizationUtils.getRequestIpAddress(httpRequest));
+
requestContext.setCreateShellEntityForNonExistingReference(createShellEntityForNonExistingReference);
if (StringUtils.isNotEmpty(deleteType)) {
if (deleteTypeOverrideEnabled) {
@@ -196,4 +199,4 @@ public class AuditFilter implements Filter {
return sb.toString();
}
}
-}
+}
\ No newline at end of file