This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 8725f31 ATLAS-4348: Atlas-Kafka Hook : When a producer publishes
messages to multiple topics, the latest relationship is marked ACTIVE , rest
are marked DELETED
8725f31 is described below
commit 8725f31af9821907ced99a829b9e0dbbf3f6de2b
Author: Radhika Kundam <[email protected]>
AuthorDate: Wed Jul 14 22:12:31 2021 -0700
ATLAS-4348: Atlas-Kafka Hook : When a producer publishes messages to
multiple topics, the latest relationship is marked ACTIVE , rest are marked
DELETED
Signed-off-by: Sarath Subramanian <[email protected]>
---
.../019-kafka_producer_lineage_add_options.json | 15 +++++++
.../org/apache/atlas/type/AtlasEntityType.java | 28 ++++++++++---
.../org/apache/atlas/TestRelationshipUtilsV2.java | 2 +
.../store/graph/v2/EntityGraphMapper.java | 49 +++++++++++++++++++---
.../store/graph/v2/AtlasEntityTestBase.java | 2 +-
.../graph/v2/AtlasRelationshipStoreV2Test.java | 33 +++++++++++++++
6 files changed, 117 insertions(+), 12 deletions(-)
diff --git
a/addons/models/1000-Hadoop/patches/019-kafka_producer_lineage_add_options.json
b/addons/models/1000-Hadoop/patches/019-kafka_producer_lineage_add_options.json
new file mode 100644
index 0000000..bff4bd5
--- /dev/null
+++
b/addons/models/1000-Hadoop/patches/019-kafka_producer_lineage_add_options.json
@@ -0,0 +1,15 @@
+{
+ "patches": [
+ {
+ "id": "TYPEDEF_PATCH_1000_019_001",
+ "description": "Add 'appendRelationshipsOnPartialUpdate' typeDefOptions
to kafka_producer_lineage",
+ "action": "UPDATE_TYPEDEF_OPTIONS",
+ "typeName": "kafka_producer_lineage",
+ "applyToVersion": "1.0",
+ "updateToVersion": "1.1",
+ "typeDefOptions": {
+ "appendRelationshipsOnPartialUpdate": "[\"outputs\"]"
+ }
+ }
+ ]
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index ded6d63..6d83599 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -65,11 +65,12 @@ public class AtlasEntityType extends AtlasStructType {
private static final String OPTION_SCHEMA_ATTRIBUTES =
"schemaAttributes";
private static final String INTERNAL_TYPENAME = "__internal";
- private static final char NS_ATTRIBUTE_NAME_SEPARATOR = '.';
+ public static final String OPTION_APPEND_RELATIONSHIPS_ON_PARTIAL_UPDATE =
"appendRelationshipsOnPartialUpdate";
- private static final char DYN_ATTRIBUTE_NAME_SEPARATOR = '.';
- private static final char DYN_ATTRIBUTE_OPEN_DELIM = '{';
- private static final char DYN_ATTRIBUTE_CLOSE_DELIM = '}';
+ private static final char NS_ATTRIBUTE_NAME_SEPARATOR = '.';
+ private static final char DYN_ATTRIBUTE_NAME_SEPARATOR = '.';
+ private static final char DYN_ATTRIBUTE_OPEN_DELIM = '{';
+ private static final char DYN_ATTRIBUTE_CLOSE_DELIM = '}';
private static final String[] ENTITY_HEADER_ATTRIBUTES = new
String[]{NAME, DESCRIPTION, OWNER, CREATE_TIME};
private static final String ENTITY_ROOT_NAME = "__ENTITY_ROOT";
@@ -326,7 +327,11 @@ public class AtlasEntityType extends AtlasStructType {
String relationshipType = relationsEntry.getKey();
AtlasAttribute relationshipAttr = relationsEntry.getValue();
- relationshipAttrDefs.add(new
AtlasRelationshipAttributeDef(relationshipType,
relationshipAttr.isLegacyAttribute(), relationshipAttr.getAttributeDef()));
+ AtlasRelationshipAttributeDef relationshipAttributeDef = new
AtlasRelationshipAttributeDef(relationshipType,
relationshipAttr.isLegacyAttribute(), relationshipAttr.getAttributeDef());
+
+
updateRelationshipAttrDefForPartialUpdate(relationshipAttributeDef, entityDef);
+
+ relationshipAttrDefs.add(relationshipAttributeDef);
}
}
@@ -369,6 +374,19 @@ public class AtlasEntityType extends AtlasStructType {
}
}
+ private void
updateRelationshipAttrDefForPartialUpdate(AtlasRelationshipAttributeDef
relationshipAttributeDef, AtlasEntityDef entityDef) {
+ String appendRelationshipsOnPartialUpdate =
entityDef.getOption(OPTION_APPEND_RELATIONSHIPS_ON_PARTIAL_UPDATE);
+ String relationshipAttributeName =
relationshipAttributeDef.getName();
+
+ if (StringUtils.isNotEmpty(appendRelationshipsOnPartialUpdate)) {
+ Set<String> relationshipTypesToAppend =
AtlasType.fromJson(appendRelationshipsOnPartialUpdate, Set.class);
+
+ if (CollectionUtils.isNotEmpty(relationshipTypesToAppend) &&
relationshipTypesToAppend.contains(relationshipAttributeName)) {
+
relationshipAttributeDef.setOption(AtlasAttributeDef.ATTRDEF_OPTION_APPEND_ON_PARTIAL_UPDATE,
Boolean.toString(true));
+ }
+ }
+ }
+
@Override
public AtlasAttribute getSystemAttribute(String attributeName) {
return AtlasEntityType.ENTITY_ROOT.allAttributes.get(attributeName);
diff --git a/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java
b/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java
index e276a42..65fa62f 100755
--- a/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java
+++ b/intg/src/test/java/org/apache/atlas/TestRelationshipUtilsV2.java
@@ -32,6 +32,7 @@ import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
@@ -97,6 +98,7 @@ public final class TestRelationshipUtilsV2 {
createOptionalAttrDef("orgLevel", ORG_LEVEL_TYPE),
createOptionalAttrDef("shares", "long"),
createOptionalAttrDef("salary", "double"));
+
employeeType.setOption(AtlasEntityType.OPTION_APPEND_RELATIONSHIPS_ON_PARTIAL_UPDATE,
"[\"friends\"]");
/******* Department Type *******/
AtlasEntityDef departmentType = createClassTypeDef(DEPARTMENT_TYPE,
description(DEPARTMENT_TYPE), superType(null),
createUniqueRequiredAttrDef("name", "string"));
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index b51cbfc..6d8305a 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -37,6 +37,8 @@ import org.apache.atlas.model.instance.AtlasRelationship;
import org.apache.atlas.model.instance.AtlasStruct;
import org.apache.atlas.model.instance.EntityMutationResponse;
import org.apache.atlas.model.instance.EntityMutations.EntityOperation;
+import org.apache.atlas.model.typedef.AtlasEntityDef;
+import
org.apache.atlas.model.typedef.AtlasEntityDef.AtlasRelationshipAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality;
import org.apache.atlas.repository.Constants;
@@ -1392,6 +1394,7 @@ public class EntityGraphMapper {
AtlasAttribute inverseRefAttribute =
attribute.getInverseRefAttribute();
Cardinality cardinality =
attribute.getAttributeDef().getCardinality();
List<Object> newElementsCreated = new ArrayList<>();
+ List<Object> allArrayElements = null;
List<Object> currentElements;
if (isReference && !isSoftReference) {
@@ -1442,13 +1445,22 @@ public class EntityGraphMapper {
}
if (isReference && !isSoftReference) {
- List<AtlasEdge> additionalEdges =
removeUnusedArrayEntries(attribute, (List) currentElements, (List)
newElementsCreated, ctx.getReferringVertex());
- newElementsCreated.addAll(additionalEdges);
+ boolean isAppendOnPartialUpdate =
getAppendOptionForRelationship(ctx.getReferringVertex(), attribute.getName());
+
+ if (isAppendOnPartialUpdate) {
+ allArrayElements = unionCurrentAndNewElements(attribute,
(List) currentElements, (List) newElementsCreated);
+ } else {
+ List<AtlasEdge> activeCurrentElements =
removeUnusedArrayEntries(attribute, (List) currentElements, (List)
newElementsCreated, ctx.getReferringVertex());
+
+ allArrayElements = unionCurrentAndNewElements(attribute,
activeCurrentElements, (List) newElementsCreated);
+ }
+ } else {
+ allArrayElements = newElementsCreated;
}
// add index to attributes of array type
- for (int index = 0; index < newElementsCreated.size(); index++) {
- Object element = newElementsCreated.get(index);
+ for (int index = 0; allArrayElements != null && index <
allArrayElements.size(); index++) {
+ Object element = allArrayElements.get(index);
if (element instanceof AtlasEdge) {
AtlasGraphUtilsV2.setEncodedProperty((AtlasEdge) element,
ATTRIBUTE_INDEX_PROPERTY_KEY, index);
@@ -1458,14 +1470,28 @@ public class EntityGraphMapper {
if (isNewElementsNull) {
setArrayElementsProperty(elementType, isSoftReference,
ctx.getReferringVertex(), ctx.getVertexProperty(), null);
} else {
- setArrayElementsProperty(elementType, isSoftReference,
ctx.getReferringVertex(), ctx.getVertexProperty(), newElementsCreated);
+ setArrayElementsProperty(elementType, isSoftReference,
ctx.getReferringVertex(), ctx.getVertexProperty(), allArrayElements);
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== mapArrayValue({})", ctx);
}
- return newElementsCreated;
+ return allArrayElements;
+ }
+
+ private boolean getAppendOptionForRelationship(AtlasVertex entityVertex,
String relationshipAttributeName) {
+ boolean ret = false;
+ String entityTypeName =
AtlasGraphUtilsV2.getTypeName(entityVertex);
+ AtlasEntityDef entityDef =
typeRegistry.getEntityDefByName(entityTypeName);
+ List<AtlasRelationshipAttributeDef> relationshipAttributeDefs =
entityDef.getRelationshipAttributeDefs();
+
+ if (CollectionUtils.isNotEmpty(relationshipAttributeDefs)) {
+ ret =
relationshipAttributeDefs.stream().anyMatch(relationshipAttrDef ->
relationshipAttrDef.getName().equals(relationshipAttributeName)
+ && relationshipAttrDef.isAppendOnPartialUpdate());
+ }
+
+ return ret;
}
private AtlasEdge createVertex(AtlasStruct struct, AtlasVertex
referringVertex, String edgeLabel, EntityMutationContext context) throws
AtlasBaseException {
@@ -1848,6 +1874,17 @@ public class EntityGraphMapper {
return ret;
}
+ private List<AtlasEdge> unionCurrentAndNewElements(AtlasAttribute
attribute, List<AtlasEdge> currentElements, List<AtlasEdge> newElements) {
+ Collection<AtlasEdge> ret = null;
+ AtlasType arrayElementType = ((AtlasArrayType)
attribute.getAttributeType()).getElementType();
+
+ if (arrayElementType != null && isReference(arrayElementType)) {
+ ret = CollectionUtils.union(currentElements, newElements);
+ }
+
+ return CollectionUtils.isNotEmpty(ret) ? new ArrayList<>(ret) :
Collections.emptyList();
+ }
+
//Removes unused edges from the old collection, compared to the new
collection
private List<AtlasEdge> removeUnusedArrayEntries(AtlasAttribute attribute,
List<AtlasEdge> currentEntries, List<AtlasEdge> newEntries, AtlasVertex
entityVertex) throws AtlasBaseException {
diff --git
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java
index 397e2ab..0bc91a7 100644
---
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java
+++
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityTestBase.java
@@ -210,7 +210,7 @@ public class AtlasEntityTestBase extends AtlasTestBase {
Assert.assertTrue(actualList.size() >=
expectedList.size());
for (int i = 0; i < expectedList.size(); i++) {
- validateAttribute(entityExtInfo, actualList.get(i),
expectedList.get(i), elemType, attrName);
+
Assert.assertTrue(actualList.contains(expectedList.get(i)));
}
}
break;
diff --git
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java
index 9099924..8bb7d09 100644
---
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java
+++
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2Test.java
@@ -51,6 +51,7 @@ import org.testng.annotations.Test;
import javax.inject.Inject;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -560,6 +561,38 @@ public abstract class AtlasRelationshipStoreV2Test extends
AtlasTestBase {
verifyRelationshipAttributeList(a2Entity, "manyB",
ImmutableList.of(getAtlasObjectId(b1)));
}
+ @Test
+ public void testRelationshipAttributeOnPartialUpdate() throws Exception {
+ AtlasObjectId maxId = employeeNameIdMap.get("Max");
+ AtlasObjectId janeId = employeeNameIdMap.get("Jane");
+ AtlasObjectId mikeId = employeeNameIdMap.get("Mike");
+ AtlasObjectId johnId = employeeNameIdMap.get("John");
+
+ // Partial Update Max's Employee.friends reference with Jane and apply
the change as a partial update.
+ // This should also update friends list of Max and Jane.
+ AtlasEntity maxEntityForUpdate = new AtlasEntity(EMPLOYEE_TYPE);
+ maxEntityForUpdate.setRelationshipAttribute("friends",
Arrays.asList(janeId));
+
+ AtlasEntityType employeeType =
typeRegistry.getEntityTypeByName(EMPLOYEE_TYPE);
+ Map<String, Object> uniqAttributes = Collections.<String,
Object>singletonMap("name", "Max");
+
+ init();
+ EntityMutationResponse updateResponse =
entityStore.updateByUniqueAttributes(employeeType, uniqAttributes, new
AtlasEntityWithExtInfo(maxEntityForUpdate));
+
+ List<AtlasEntityHeader> partialUpdatedEntities =
updateResponse.getPartialUpdatedEntities();
+ assertEquals(partialUpdatedEntities.size(), 2);
+ // 2 entities should have been updated:
+ // * Max to add Employee.friends reference
+ // * Jane to add Max from Employee.friends
+ AtlasEntitiesWithExtInfo updatedEntities =
entityStore.getByIds(ImmutableList.of(maxId.getGuid(), janeId.getGuid()));
+
+ AtlasEntity maxEntity = updatedEntities.getEntity(maxId.getGuid());
+ verifyRelationshipAttributeList(maxEntity, "friends",
ImmutableList.of(mikeId, johnId, janeId));
+
+ AtlasEntity janeEntity = updatedEntities.getEntity(janeId.getGuid());
+ verifyRelationshipAttributeList(janeEntity, "friends",
ImmutableList.of(maxId));
+ }
+
protected abstract void
verifyRelationshipAttributeUpdate_NonComposite_OneToOne(AtlasEntity a1,
AtlasEntity b);
protected abstract void
verifyRelationshipAttributeUpdate_NonComposite_OneToMany(AtlasEntity entity)
throws Exception;