This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new cb2421b ATLAS-3020: Audit APIs for classification updates. new cd51cef Merge branch 'master' of https://gitbox.apache.org/repos/asf/atlas cb2421b is described below commit cb2421bc0eb4f6a9c2d9f6dadf3b9ba0080e3e82 Author: Ashutosh Mestry <ames...@hortonworks.com> AuthorDate: Mon Jan 21 20:34:50 2019 -0800 ATLAS-3020: Audit APIs for classification updates. --- .../main/java/org/apache/atlas/AtlasClientV2.java | 21 +- .../atlas/model/instance/AtlasEntityHeaders.java | 56 ++++ .../audit/CassandraBasedAuditRepository.java | 7 + .../repository/audit/EntityAuditRepository.java | 9 + .../audit/HBaseBasedAuditRepository.java | 52 +++- .../audit/InMemoryEntityAuditRepository.java | 17 ++ .../audit/NoopEntityAuditRepository.java | 7 + .../repository/store/graph/AtlasEntityStore.java | 3 + .../store/graph/v2/AtlasEntityStoreV2.java | 7 + .../store/graph/v2/ClassificationAssociator.java | 316 +++++++++++++++++++++ .../store/graph/v2/EntityGraphRetriever.java | 2 +- .../graph/v2/ClassificationAssociatorTest.java | 235 +++++++++++++++ .../col-entity-None.json | 10 + .../col-entity-PII-FIN_PII.json | 32 +++ .../classification-association/col-entity-PII.json | 22 ++ .../col-entity-T1-prop-Tn.json | 34 +++ .../classification-association/header-FIN_PII.json | 32 +++ .../classification-association/header-None.json | 21 ++ .../header-PII-VENDOR_PII.json | 42 +++ .../classification-association/header-PII.json | 32 +++ .../header-Tx-prop-T1.json | 42 +++ .../json/classification-association/header-Tx.json | 26 ++ .../classification-association/header-empty.json | 3 + .../java/org/apache/atlas/web/rest/EntityREST.java | 59 +++- 24 files changed, 1081 insertions(+), 6 deletions(-) diff --git a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java index 7c8caee..33466e5 100644 --- a/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java +++ b/client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java @@ -27,7 +27,9 @@ import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasClassification.AtlasClassifications; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasRelationship; +import org.apache.atlas.model.instance.AtlasEntityHeaders; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; @@ -72,6 +74,8 @@ public class AtlasClientV2 extends AtlasBaseClient { // Relationships APIs private static final String RELATIONSHIPS_URI = BASE_URI + "v2/relationship/"; + private static final String BULK_HEADERS = "bulk/headers"; + private static final String BULK_SET_CLASSIFICATIONS = "bulk/setClassifications"; public AtlasClientV2(String[] baseUrl, String[] basicAuthUserNamePassword) { super(baseUrl, basicAuthUserNamePassword); @@ -326,13 +330,26 @@ public class AtlasClientV2 extends AtlasBaseClient { } public void deleteClassifications(String guid, List<AtlasClassification> classifications) throws AtlasServiceException { - callAPI(formatPathParameters(API_V2.GET_CLASSIFICATIONS, guid), AtlasClassifications.class, classifications); + for (AtlasClassification c : classifications) { + callAPI(formatPathParameters(API_V2.DELETE_CLASSIFICATION, guid, c.getTypeName()), AtlasClassifications.class, classifications); + } } public void deleteClassification(String guid, String classificationName) throws AtlasServiceException { callAPI(formatPathParameters(API_V2.DELETE_CLASSIFICATION, guid, classificationName), null, null); } + public AtlasEntityHeaders getEntityHeaders(long tagUpdateStartTime) throws AtlasServiceException { + MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl(); + queryParams.add("tagUpdateStartTime", Long.toString(tagUpdateStartTime)); + + return callAPI(API_V2.GET_BULK_HEADERS, AtlasEntityHeaders.class, queryParams); + } + + public String setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasServiceException { + return callAPI(API_V2.UPDATE_BULK_SET_CLASSIFICATIONS, String.class, entityHeaders); + } + /* Discovery calls */ public AtlasSearchResult dslSearch(final String query) throws AtlasServiceException { MultivaluedMap<String, String> queryParams = new MultivaluedMapImpl(); @@ -480,6 +497,8 @@ public class AtlasClientV2 extends AtlasBaseClient { public static final API_V2 DELETE_RELATIONSHIP_BY_GUID = new API_V2(RELATIONSHIPS_URI + "guid/", HttpMethod.DELETE, Response.Status.NO_CONTENT); public static final API_V2 CREATE_RELATIONSHIP = new API_V2(RELATIONSHIPS_URI , HttpMethod.POST, Response.Status.OK); public static final API_V2 UPDATE_RELATIONSHIP = new API_V2(RELATIONSHIPS_URI , HttpMethod.PUT, Response.Status.OK); + public static final API_V2 GET_BULK_HEADERS = new API_V2(ENTITY_API + BULK_HEADERS, HttpMethod.GET, Response.Status.OK); + public static final API_V2 UPDATE_BULK_SET_CLASSIFICATIONS = new API_V2(ENTITY_API + AtlasClientV2.BULK_SET_CLASSIFICATIONS, HttpMethod.POST, Response.Status.OK); private API_V2(String path, String method, Response.Status status) { super(path, method, status); diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeaders.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeaders.java new file mode 100644 index 0000000..11c6fc7 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntityHeaders.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.model.instance; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +import java.util.Map; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class AtlasEntityHeaders { + Map<String, AtlasEntityHeader> guidHeaderMap; + + public AtlasEntityHeaders() { + } + + public AtlasEntityHeaders(Map<String, AtlasEntityHeader> guidEntityHeaderMap) { + guidHeaderMap = guidEntityHeaderMap; + } + + public void setGuidHeaderMap(Map<String, AtlasEntityHeader> guidHeaderMap) { + this.guidHeaderMap = guidHeaderMap; + } + + public Map<String, AtlasEntityHeader> getGuidHeaderMap() { + return guidHeaderMap; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java index eb78f8f..b8131bd 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/CassandraBasedAuditRepository.java @@ -30,6 +30,7 @@ import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.commons.lang.NotImplementedException; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import javax.inject.Singleton; @@ -188,6 +190,11 @@ public class CassandraBasedAuditRepository extends AbstractStorageBasedAuditRepo } @Override + public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException { + throw new NotImplementedException(); + } + + @Override public void start() throws AtlasException { initApplicationProperties(); initializeSettings(); diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java index aab2d5b..2a47e39 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java @@ -24,6 +24,7 @@ import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.exception.AtlasBaseException; import java.util.List; +import java.util.Set; /** * Interface for repository for storing entity audit events @@ -77,6 +78,14 @@ public interface EntityAuditRepository { */ List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException; + /*** + * List events for given time range where classifications have been added, deleted or updated. + * @param fromTimestamp from timestamp + * @param toTimestamp to timestamp + * @return events that match the range + * @throws AtlasBaseException + */ + Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException; /** * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index 6f4415f..5f01293 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -50,7 +50,10 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.BinaryPrefixComparator; +import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.PageFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.BloomType; @@ -64,9 +67,10 @@ import javax.inject.Singleton; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Properties; +import java.util.Set; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD; import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE; @@ -546,6 +550,52 @@ public class HBaseBasedAuditRepository extends AbstractStorageBasedAuditReposito } @Override + public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException { + final String classificationUpdatesAction = "CLASSIFICATION_"; + + if (LOG.isDebugEnabled()) { + LOG.debug("Listing events for fromTimestamp {}, toTimestamp {}, action {}", fromTimestamp, toTimestamp); + } + + Table table = null; + ResultScanner scanner = null; + + try { + Set<String> guids = new HashSet<>(); + + table = connection.getTable(tableName); + + byte[] filterValue = Bytes.toBytes(classificationUpdatesAction); + BinaryPrefixComparator binaryPrefixComparator = new BinaryPrefixComparator(filterValue); + SingleColumnValueFilter filter = new SingleColumnValueFilter(COLUMN_FAMILY, COLUMN_ACTION, CompareFilter.CompareOp.EQUAL, binaryPrefixComparator); + Scan scan = new Scan().setFilter(filter).setTimeRange(fromTimestamp, toTimestamp); + + Result result; + scanner = table.getScanner(scan); + while ((result = scanner.next()) != null) { + EntityAuditEvent event = fromKey(result.getRow()); + + if (event == null) { + continue; + } + + guids.add(event.getEntityId()); + } + + return guids; + } catch (IOException e) { + throw new AtlasBaseException(e); + } finally { + try { + close(scanner); + close(table); + } catch (AtlasException e) { + throw new AtlasBaseException(e); + } + } + } + + @Override public void start() throws AtlasException { Configuration configuration = ApplicationProperties.get(); startInternal(configuration, getHBaseConfiguration(configuration)); diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java index dca3b85..ad6ec94 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java @@ -21,6 +21,7 @@ package org.apache.atlas.repository.audit; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; @@ -28,7 +29,9 @@ import org.springframework.stereotype.Component; import javax.inject.Singleton; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -119,6 +122,20 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository { } @Override + public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException { + Set<String> events = new HashSet<>(); + + for (EntityAuditEventV2 event : auditEventsV2.values()) { + long timestamp = event.getTimestamp(); + if (timestamp > fromTimestamp && timestamp <= toTimestamp) { + events.add(event.getEntityId()); + } + } + + return events; + } + + @Override public List<Object> listEvents(String entityId, String startKey, short maxResults) { List events = listEventsV2(entityId, startKey, maxResults); diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java index e3a6078..4bb68d5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java @@ -20,12 +20,14 @@ package org.apache.atlas.repository.audit; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.audit.EntityAuditEventV2; import org.springframework.stereotype.Component; import javax.inject.Singleton; import java.util.Collections; import java.util.List; +import java.util.Set; /** * Implementation that completely disables the audit repository. @@ -66,6 +68,11 @@ public class NoopEntityAuditRepository implements EntityAuditRepository { } @Override + public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException { + return Collections.emptySet(); + } + + @Override public List<Object> listEvents(String entityId, String startKey, short n) { return Collections.emptyList(); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index 750fa17..1da1138 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -24,6 +24,7 @@ import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasEntityHeaders; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.repository.store.graph.v2.EntityStream; @@ -216,4 +217,6 @@ public interface AtlasEntityStore { List<AtlasClassification> getClassifications(String guid) throws AtlasBaseException; AtlasClassification getClassification(String guid, String classificationName) throws AtlasBaseException; + + String setClassifications(AtlasEntityHeaders entityHeaders); } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java index 35aa3af..e8e5400 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java @@ -657,6 +657,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore { return ret; } + @Override + @GraphTransaction + public String setClassifications(AtlasEntityHeaders entityHeaders) { + ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(typeRegistry, this); + return associator.setClassifications(entityHeaders.getGuidHeaderMap()); + } + private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> createOrUpdate()"); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java new file mode 100644 index 0000000..11d008f --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociator.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasEntityHeaders; +import org.apache.atlas.repository.audit.EntityAuditRepository; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@Component +public class ClassificationAssociator { + private static final Logger LOG = LoggerFactory.getLogger(ClassificationAssociator.class); + + public static class Retriever { + private final EntityAuditRepository auditRepository; + private final EntityGraphRetriever entityRetriever; + + public Retriever(AtlasTypeRegistry typeRegistry, EntityAuditRepository auditRepository) { + this.entityRetriever = new EntityGraphRetriever(typeRegistry); + this.auditRepository = auditRepository; + } + + Retriever(EntityGraphRetriever entityGraphRetriever, EntityAuditRepository auditRepository) { + this.entityRetriever = entityGraphRetriever; + this.auditRepository = auditRepository; + } + + public AtlasEntityHeaders get(long fromTimestamp, long toTimestamp) throws AtlasBaseException { + toTimestamp = incrementTimestamp(toTimestamp); + Set<String> guids = auditRepository.getEntitiesWithTagChanges(fromTimestamp, toTimestamp); + + Map<String, AtlasEntityHeader> guidEntityHeaderMap = new HashMap<>(); + for (String guid : guids) { + AtlasEntityHeader entityHeader = entityRetriever.toAtlasEntityHeaderWithClassifications(guid); + + guidEntityHeaderMap.put(guid, entityHeader); + } + + guids.clear(); + return new AtlasEntityHeaders(guidEntityHeaderMap); + } + + private long incrementTimestamp(long t) { + return t + 1; + } + } + + public static class Updater { + static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName"; + static final String STATUS_DONE = "(Done)"; + static final String STATUS_SKIPPED = "(Skipped)"; + static final String STATUS_PARTIAL = "(Partial)"; + + private static final String PROCESS_FORMAT = "%s:%s:%s:%s -> %s:%s"; + static final String PROCESS_ADD = "Add"; + static final String PROCESS_UPDATE = "Update"; + static final String PROCESS_DELETE = "Delete"; + static final String JSONIFY_STRING_FORMAT = "\"%s\","; + + private final AtlasTypeRegistry typeRegistry; + private final AtlasEntityStore entitiesStore; + private final EntityGraphRetriever entityRetriever; + private StringBuilder actionSummary = new StringBuilder(); + + public Updater(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) { + this.typeRegistry = typeRegistry; + this.entitiesStore = entitiesStore; + entityRetriever = new EntityGraphRetriever(typeRegistry); + } + + public String setClassifications(Map<String, AtlasEntityHeader> map) { + for (AtlasEntityHeader incomingEntityHeader : map.values()) { + String typeName = incomingEntityHeader.getTypeName(); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + if (entityType == null) { + LOG.warn("Entity type: {}: Not found: {}!", typeName, STATUS_SKIPPED); + summarizeFormat("%s: %s", typeName, STATUS_SKIPPED); + continue; + } + + String qualifiedName = getQualifiedName(incomingEntityHeader); + AtlasEntityHeader entityToBeChanged = getByUniqueAttributes(entityType, qualifiedName, incomingEntityHeader.getAttributes()); + if (entityToBeChanged == null) { + summarizeFormat("Entity:%s:%s:[Not found]:%s", entityType.getTypeName(), qualifiedName, STATUS_SKIPPED); + continue; + } + + + String guid = entityToBeChanged.getGuid(); + Map<String, List<AtlasClassification>> operationListMap = computeChanges(incomingEntityHeader, entityToBeChanged); + commitChanges(guid, typeName, qualifiedName, operationListMap); + } + + return getJsonArray(actionSummary); + } + + private void commitChanges(String entityGuid, String typeName, String qualifiedName, + Map<String, List<AtlasClassification>> operationListMap) { + if (MapUtils.isEmpty(operationListMap)) { + return; + } + + deleteClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_DELETE)); + updateClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_UPDATE)); + addClassifications(entityGuid, typeName, qualifiedName, operationListMap.get(PROCESS_ADD)); + + operationListMap.clear(); + } + + private Map<String, List<AtlasClassification>> computeChanges(AtlasEntityHeader incomingEntityHeader, AtlasEntityHeader entityToBeUpdated) { + if (incomingEntityHeader == null || entityToBeUpdated == null) { + return null; + } + + ListOps<AtlasClassification> listOps = new ListOps<>(); + List<AtlasClassification> incomingClassifications = listOps.filter(incomingEntityHeader.getGuid(), incomingEntityHeader.getClassifications()); + List<AtlasClassification> entityClassifications = listOps.filter(entityToBeUpdated.getGuid(), entityToBeUpdated.getClassifications()); + + if (CollectionUtils.isEmpty(incomingClassifications) && CollectionUtils.isEmpty(entityClassifications)) { + return null; + } + + Map<String, List<AtlasClassification>> operationListMap = new HashMap<>(); + + bucket(PROCESS_DELETE, operationListMap, listOps.subtract(entityClassifications, incomingClassifications)); + bucket(PROCESS_UPDATE, operationListMap, listOps.intersect(incomingClassifications, entityClassifications)); + bucket(PROCESS_ADD, operationListMap, listOps.subtract(incomingClassifications, entityClassifications)); + + return operationListMap; + } + + private void bucket(String op, Map<String, List<AtlasClassification>> operationListMap, List<AtlasClassification> results) { + if (CollectionUtils.isEmpty(results)) { + return; + } + + operationListMap.put(op, results); + } + + private void addClassifications(String entityToBeChangedGuid, String typeName, String qualifiedName, List<AtlasClassification> list) { + if (CollectionUtils.isEmpty(list)) { + return; + } + + String status = STATUS_DONE; + String classificationNames = getClassificationNames(list); + try { + entitiesStore.addClassifications(entityToBeChangedGuid, list); + } catch (AtlasBaseException e) { + status = STATUS_PARTIAL; + LOG.warn("{}:{}:{} -> {}: {}.", PROCESS_UPDATE, typeName, qualifiedName, classificationNames, status); + } + + summarize(PROCESS_ADD, entityToBeChangedGuid, typeName, qualifiedName, classificationNames, status); + } + + private void updateClassifications(String entityToBeChangedGuid, String typeName, String qualifiedName, List<AtlasClassification> list) { + if (CollectionUtils.isEmpty(list)) { + return; + } + + String status = STATUS_DONE; + String classificationNames = getClassificationNames(list); + + try { + entitiesStore.updateClassifications(entityToBeChangedGuid, list); + } catch (AtlasBaseException e) { + status = STATUS_PARTIAL; + LOG.warn("{}:{}:{} -> {}: {}.", PROCESS_UPDATE, typeName, qualifiedName, classificationNames, status); + } + + summarize(PROCESS_UPDATE, entityToBeChangedGuid, typeName, qualifiedName, classificationNames, status); + } + + private void deleteClassifications(String typeName, String entityGuid, String qualifiedName, List<AtlasClassification> list) { + if (CollectionUtils.isEmpty(list)) { + return; + } + + String status = STATUS_DONE; + String classificationTypeName = getClassificationNames(list); + for (AtlasClassification c : list) { + try { + entitiesStore.deleteClassification(entityGuid, c.getTypeName()); + } catch (AtlasBaseException e) { + status = STATUS_PARTIAL; + LOG.warn("{}:{}:{} -> {}: Skipped!", entityGuid, typeName, qualifiedName, c.getTypeName()); + } + } + + summarize(PROCESS_DELETE, entityGuid, typeName, qualifiedName, classificationTypeName, status); + } + + AtlasEntityHeader getByUniqueAttributes(AtlasEntityType entityType, String qualifiedName, Map<String, Object> attrValues) { + try { + AtlasVertex vertex = AtlasGraphUtilsV2.findByUniqueAttributes(entityType, attrValues); + if (vertex == null) { + return null; + } + + return entityRetriever.toAtlasEntityHeaderWithClassifications(vertex); + } catch (AtlasBaseException e) { + LOG.warn("{}:{} could not be processed!", entityType, qualifiedName); + return null; + } catch (Exception ex) { + LOG.error("{}:{} could not be processed!", entityType, qualifiedName, ex); + return null; + } + } + + private String getClassificationNames(List<AtlasClassification> list) { + return list.stream().map(AtlasClassification::getTypeName).collect(Collectors.joining(", ")); + } + + private String getQualifiedName(AtlasEntityHeader entityHeader) { + return (String) entityHeader.getAttribute(ATTR_NAME_QUALIFIED_NAME); + } + + private void summarize(String... s) { + summarizeFormat(PROCESS_FORMAT, s); + } + + private void summarizeFormat(String format, String... s) { + summarize(String.format(format, s)); + } + + private void summarize(String s) { + actionSummary.append(String.format(JSONIFY_STRING_FORMAT, s)); + } + + private String getJsonArray(StringBuilder actionSummary) { + return "[" + StringUtils.removeEnd(actionSummary.toString(), ",") + "]"; + } + } + + private static class ListOps<V extends AtlasClassification> { + public List<V> intersect(List<V> lhs, List<V> rhs) { + if (CollectionUtils.isEmpty(rhs)) { + return null; + } + + List<V> result = new ArrayList<>(); + for (V c : rhs) { + V found = findFrom(lhs, c); + if (found != null) { + result.add(found); + } + } + + return result; + } + + public List<V> subtract(List<V> lhs, List<V> rhs) { + if (CollectionUtils.isEmpty(lhs)) { + return null; + } + + List<V> result = new ArrayList<>(); + for (V c : lhs) { + V found = findFrom(rhs, c); + if (found == null) { + result.add(c); + } + } + + return result; + } + + private V findFrom(List<V> reference, V check) { + return (V) CollectionUtils.find(reference, ox -> + ((V) ox).getTypeName().equals(check.getTypeName())); + } + + public List<V> filter(String guid, List<V> list) { + if (CollectionUtils.isEmpty(list)) { + return list; + } + + return list.stream().filter(x -> x.getEntityGuid().equals(guid)).collect(Collectors.toList()); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java index 066abc1..79216d5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java @@ -95,7 +95,7 @@ import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelation import static org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.OUT; @Component -public final class EntityGraphRetriever { +public class EntityGraphRetriever { private static final Logger LOG = LoggerFactory.getLogger(EntityGraphRetriever.class); private static final String TERM_RELATION_NAME = "AtlasGlossarySemanticAssignment"; diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociatorTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociatorTest.java new file mode 100644 index 0000000..ab5bb2b --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/ClassificationAssociatorTest.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.store.graph.v2; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasEntityHeaders; +import org.apache.atlas.repository.audit.EntityAuditRepository; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasJson; +import org.apache.atlas.utils.TestResourceFileUtils; +import org.apache.commons.lang.StringUtils; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_ADD; +import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_DELETE; +import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.PROCESS_UPDATE; +import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.STATUS_DONE; +import static org.apache.atlas.repository.store.graph.v2.ClassificationAssociator.Updater.STATUS_SKIPPED; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.FileAssert.fail; + +public class ClassificationAssociatorTest { + private static final String TABLE_GUID = "df122fc3-5555-40f8-a30f-3090b8a622f8"; + private static String TEST_FILES_SUBDIR = "classification-association"; + private static String MESSAGE_SEPARATOR = ":"; + private static String ENTITY_NAME_SEPARATOR = "->"; + + private class ClassificationAssociatorUpdaterForSpy extends ClassificationAssociator.Updater { + private final String entityFileName; + + public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore) { + super(typeRegistry, entitiesStore); + this.entityFileName = StringUtils.EMPTY; + } + + public ClassificationAssociatorUpdaterForSpy(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore, String entityFileName) { + super(typeRegistry, entitiesStore); + this.entityFileName = entityFileName; + } + + @Override + AtlasEntityHeader getByUniqueAttributes(AtlasEntityType entityType, String qualifiedName, Map<String, Object> attrValues) { + try { + if (StringUtils.isEmpty(entityFileName)) { + return null; + } + + return getEntityHeaderFromFile(entityFileName); + } catch (IOException e) { + fail(entityFileName + " could not be loaded."); + return null; + } + } + } + + @Test + public void auditScanYieldsNothing_EmptyHeadersReturned() { + AtlasEntityHeaders actualEntityHeaders = setupRetriever("header-empty", 0, 0, null); + + assertNotNull(actualEntityHeaders); + assertEquals(actualEntityHeaders.getGuidHeaderMap().size(),0); + } + + @Test + public void auditScanYieldsOneEntity_EntityHeadersHasOneElementWithClassification() { + AtlasEntityHeaders actualEntityHeaders = setupRetriever("header-Tx", 0, 0, TABLE_GUID); + + assertNotNull(actualEntityHeaders); + assertEquals(actualEntityHeaders.getGuidHeaderMap().size(), 1); + assertTrue(actualEntityHeaders.getGuidHeaderMap().containsKey(TABLE_GUID)); + assertEquals(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getGuid(), TABLE_GUID); + assertNotNull(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getClassifications()); + assertEquals(actualEntityHeaders.getGuidHeaderMap().get(TABLE_GUID).getClassifications().size(), 1); + } + + private AtlasEntityHeaders setupRetriever(String headersFile, int fromTimestamp, int toTimestamp, final String tableGuid) { + AtlasEntityHeader entityHeaderWithClassification = null; + try { + Set<String> guids = new HashSet<String>(); + entityHeaderWithClassification = TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, headersFile, AtlasEntityHeader.class); + if (!StringUtils.isEmpty(tableGuid)) { + guids.add(tableGuid); + } + + EntityAuditRepository auditRepository = mock(EntityAuditRepository.class); + when(auditRepository.getEntitiesWithTagChanges(anyLong(), anyLong())).thenReturn(guids); + + EntityGraphRetriever entityGraphRetriever = mock(EntityGraphRetriever.class); + when(entityGraphRetriever.toAtlasEntityHeaderWithClassifications(TABLE_GUID)).thenReturn(entityHeaderWithClassification); + + ClassificationAssociator.Retriever retriever = new ClassificationAssociator.Retriever(entityGraphRetriever, auditRepository); + return retriever.get(fromTimestamp, toTimestamp); + } + catch (Exception ex) { + fail("Exception!"); + return null; + } + } + + @Test + public void updaterIncorrectType_ReturnsError() throws IOException { + AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile("header-PII"); + AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class); + + AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class); + when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(null); + + ClassificationAssociator.Updater updater = new ClassificationAssociator.Updater(typeRegistry, entitiesStore); + String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap()); + + assertTrue(summary.contains("hive_")); + assertTrue(summary.contains(STATUS_SKIPPED)); + } + + @Test + public void updaterCorrectTypeEntityNotFound_Skipped() throws IOException { + AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile("header-PII"); + AtlasEntityType hiveTable = mock(AtlasEntityType.class); + AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class); + AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class); + + when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable); + when(hiveTable.getTypeName()).thenReturn("hive_column"); + + ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore); + String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap()); + + TypeReference<String[]> typeReference = new TypeReference<String[]>() {}; + String[] summaryArray = AtlasJson.fromJson(summary, typeReference); + assertEquals(summaryArray.length, 1); + assertSummaryElement(summaryArray[0], "Entity", STATUS_SKIPPED, ""); + } + + @Test + public void updaterTests() throws IOException { + updaterAssert("header-PII", "col-entity-None", PROCESS_ADD + ":PII"); + updaterAssert("header-PII", "col-entity-PII", new String[]{PROCESS_UPDATE + ":PII"}); + updaterAssert("header-None", "col-entity-PII", new String[]{PROCESS_DELETE + ":PII"}); + updaterAssert("header-PII-VENDOR_PII", "col-entity-PII-FIN_PII", + PROCESS_DELETE + ":FIN_PII", + PROCESS_UPDATE + ":PII", + PROCESS_ADD + ":VENDOR_PII"); + updaterAssert("header-None", "col-entity-None", new String[]{}); + updaterAssert("header-FIN_PII", "col-entity-PII", + PROCESS_DELETE + ":PII", + PROCESS_ADD + ":FIN_PII"); + } + + @Test + public void updater_filterPropagatedClassifications() throws IOException { + updaterAssert("header-Tx-prop-T1", "col-entity-T1-prop-Tn", + PROCESS_DELETE + ":T1", + PROCESS_ADD + ":Tx"); + } + + + private void assertSummaryElement(String summaryElement, String operation, String status, String classificationName) { + String[] splits = StringUtils.split(summaryElement, MESSAGE_SEPARATOR); + String[] nameSplits = StringUtils.split(splits[3], ENTITY_NAME_SEPARATOR); + if (nameSplits.length > 1) { + assertEquals(nameSplits[1].trim(), classificationName); + } + + assertEquals(splits[0], operation); + assertEquals(splits[4], status); + } + + private String[] setupUpdater(String entityHeaderFileName, String entityFileName, int expectedSummaryLength) throws IOException { + AtlasEntityHeaders entityHeaderMap = getEntityHeaderMapFromFile(entityHeaderFileName); + + AtlasEntityType hiveTable = mock(AtlasEntityType.class); + AtlasEntityStore entitiesStore = mock(AtlasEntityStore.class); + AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class); + + when(typeRegistry.getEntityTypeByName(anyString())).thenReturn(hiveTable); + when(hiveTable.getTypeName()).thenReturn("hive_column"); + + ClassificationAssociatorUpdaterForSpy updater = new ClassificationAssociatorUpdaterForSpy(typeRegistry, entitiesStore, entityFileName); + String summary = updater.setClassifications(entityHeaderMap.getGuidHeaderMap()); + + TypeReference<String[]> typeReference = new TypeReference<String[]>() {}; + String[] summaryArray = AtlasJson.fromJson(summary, typeReference); + assertEquals(summaryArray.length, expectedSummaryLength); + + return summaryArray; + } + + private AtlasEntityHeader getEntityHeaderFromFile(String entityJson) throws IOException { + return TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, entityJson, AtlasEntityHeader.class); + } + + private AtlasEntityHeaders getEntityHeaderMapFromFile(String filename) throws IOException { + return TestResourceFileUtils.readObjectFromJson(TEST_FILES_SUBDIR, filename, AtlasEntityHeaders.class); + } + + private void updaterAssert(String incoming, String entity, String... opNamePair) throws IOException { + String[] summary = setupUpdater(incoming, entity, opNamePair.length); + + for (int i = 0; i < opNamePair.length; i++) { + String s = opNamePair[i]; + String[] splits = StringUtils.split(s, ":"); + assertSummaryElement(summary[i], splits[0], STATUS_DONE, splits[1]); + } + } +} diff --git a/repository/src/test/resources/json/classification-association/col-entity-None.json b/repository/src/test/resources/json/classification-association/col-entity-None.json new file mode 100644 index 0000000..6c04a3d --- /dev/null +++ b/repository/src/test/resources/json/classification-association/col-entity-None.json @@ -0,0 +1,10 @@ +{ + "typeName": "hive_column", + "attributes": { + "owner": "hive", + "qualifiedName": "hortoniabank.us_customers.nationalid@cl1", + "name": "nationalid" + }, + "guid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "status": "ACTIVE" +} diff --git a/repository/src/test/resources/json/classification-association/col-entity-PII-FIN_PII.json b/repository/src/test/resources/json/classification-association/col-entity-PII-FIN_PII.json new file mode 100644 index 0000000..283d863 --- /dev/null +++ b/repository/src/test/resources/json/classification-association/col-entity-PII-FIN_PII.json @@ -0,0 +1,32 @@ +{ + "typeName": "hive_column", + "attributes": { + "owner": "hive", + "qualifiedName": "hortoniabank.us_customers.nationalid@cl1", + "name": "nationalid" + }, + "guid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "status": "ACTIVE", + "classifications": [ + { + "typeName": "PII", + "attributes": { + "type": "ssn" + }, + "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "entityStatus": "ACTIVE", + "propagate": true, + "removePropagationsOnEntityDelete": false + }, + { + "typeName": "FIN_PII", + "attributes": { + "type": "ssn" + }, + "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "entityStatus": "ACTIVE", + "propagate": true, + "removePropagationsOnEntityDelete": false + } + ] +} diff --git a/repository/src/test/resources/json/classification-association/col-entity-PII.json b/repository/src/test/resources/json/classification-association/col-entity-PII.json new file mode 100644 index 0000000..af34e7d --- /dev/null +++ b/repository/src/test/resources/json/classification-association/col-entity-PII.json @@ -0,0 +1,22 @@ +{ + "typeName": "hive_column", + "attributes": { + "owner": "hive", + "qualifiedName": "hortoniabank.us_customers.nationalid@cl1", + "name": "nationalid" + }, + "guid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "status": "ACTIVE", + "classifications": [ + { + "typeName": "PII", + "attributes": { + "type": "ssn" + }, + "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "entityStatus": "ACTIVE", + "propagate": true, + "removePropagationsOnEntityDelete": false + } + ] +} diff --git a/repository/src/test/resources/json/classification-association/col-entity-T1-prop-Tn.json b/repository/src/test/resources/json/classification-association/col-entity-T1-prop-Tn.json new file mode 100644 index 0000000..4f9cbd2 --- /dev/null +++ b/repository/src/test/resources/json/classification-association/col-entity-T1-prop-Tn.json @@ -0,0 +1,34 @@ +{ + "typeName": "hive_column", + "attributes": { + "owner": "hive", + "createTime": 1547071410000, + "qualifiedName": "stocks.daily@cl1", + "name": "daily" + }, + "guid": "df122fc3-5555-40f8-a30f-3090b8a622f8", + "status": "ACTIVE", + "displayText": "daily", + "classifications": [ + { + "typeName": "T1", + "attributes": {}, + "entityGuid": "df122fc3-5555-40f8-a30f-3090b8a622f8", + "entityStatus": "ACTIVE", + "propagate": false, + "validityPeriods": [], + "removePropagationsOnEntityDelete": false + }, + { + "typeName": "Tn", + "attributes": {}, + "entityGuid": "22222222-5555-40f8-a30f-3090b8a622f8", + "entityStatus": "ACTIVE", + "propagate": false, + "validityPeriods": [], + "removePropagationsOnEntityDelete": false + } + ], + "meaningNames": [], + "meanings": [] +} diff --git a/repository/src/test/resources/json/classification-association/header-FIN_PII.json b/repository/src/test/resources/json/classification-association/header-FIN_PII.json new file mode 100644 index 0000000..35bb6db --- /dev/null +++ b/repository/src/test/resources/json/classification-association/header-FIN_PII.json @@ -0,0 +1,32 @@ +{ + "guidHeaderMap": { + "0ce68113-77fe-4ed1-9585-69371202bd74": { + "typeName": "hive_column", + "attributes": { + "owner": "hive", + "qualifiedName": "hortoniabank.us_customers.nationalid@cl1", + "name": "nationalid" + }, + "guid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "status": "ACTIVE", + "displayText": "nationalid", + "classificationNames": [ + "FIN_PII" + ], + "classifications": [ + { + "typeName": "FIN_PII", + "attributes": { + "type": "ssn" + }, + "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "entityStatus": "ACTIVE", + "propagate": true, + "removePropagationsOnEntityDelete": false + } + ], + "meaningNames": [], + "meanings": [] + } + } +} diff --git a/repository/src/test/resources/json/classification-association/header-None.json b/repository/src/test/resources/json/classification-association/header-None.json new file mode 100644 index 0000000..a858990 --- /dev/null +++ b/repository/src/test/resources/json/classification-association/header-None.json @@ -0,0 +1,21 @@ +{ + "guidHeaderMap": { + "0ce68113-77fe-4ed1-9585-69371202bd74": { + "typeName": "hive_column", + "attributes": { + "owner": "hive", + "qualifiedName": "hortoniabank.us_customers.nationalid@cl1", + "name": "nationalid" + }, + "guid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "status": "ACTIVE", + "displayText": "nationalid", + "classificationNames": [ + ], + "classifications": [ + ], + "meaningNames": [], + "meanings": [] + } + } +} diff --git a/repository/src/test/resources/json/classification-association/header-PII-VENDOR_PII.json b/repository/src/test/resources/json/classification-association/header-PII-VENDOR_PII.json new file mode 100644 index 0000000..58638f7 --- /dev/null +++ b/repository/src/test/resources/json/classification-association/header-PII-VENDOR_PII.json @@ -0,0 +1,42 @@ +{ + "guidHeaderMap": { + "0ce68113-77fe-4ed1-9585-69371202bd74": { + "typeName": "hive_column", + "attributes": { + "owner": "hive", + "qualifiedName": "hortoniabank.us_customers.nationalid@cl1", + "name": "nationalid" + }, + "guid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "status": "ACTIVE", + "displayText": "nationalid", + "classificationNames": [ + "PII" + ], + "classifications": [ + { + "typeName": "PII", + "attributes": { + "type": "ssn" + }, + "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "entityStatus": "ACTIVE", + "propagate": true, + "removePropagationsOnEntityDelete": false + }, + { + "typeName": "VENDOR_PII", + "attributes": { + "type": "ssn" + }, + "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "entityStatus": "ACTIVE", + "propagate": true, + "removePropagationsOnEntityDelete": false + } + ], + "meaningNames": [], + "meanings": [] + } + } +} diff --git a/repository/src/test/resources/json/classification-association/header-PII.json b/repository/src/test/resources/json/classification-association/header-PII.json new file mode 100644 index 0000000..bfc6d2e --- /dev/null +++ b/repository/src/test/resources/json/classification-association/header-PII.json @@ -0,0 +1,32 @@ +{ + "guidHeaderMap": { + "0ce68113-77fe-4ed1-9585-69371202bd74": { + "typeName": "hive_column", + "attributes": { + "owner": "hive", + "qualifiedName": "hortoniabank.us_customers.nationalid@cl1", + "name": "nationalid" + }, + "guid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "status": "ACTIVE", + "displayText": "nationalid", + "classificationNames": [ + "PII" + ], + "classifications": [ + { + "typeName": "PII", + "attributes": { + "type": "ssn" + }, + "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "entityStatus": "ACTIVE", + "propagate": true, + "removePropagationsOnEntityDelete": false + } + ], + "meaningNames": [], + "meanings": [] + } + } +} diff --git a/repository/src/test/resources/json/classification-association/header-Tx-prop-T1.json b/repository/src/test/resources/json/classification-association/header-Tx-prop-T1.json new file mode 100644 index 0000000..8f2f26f --- /dev/null +++ b/repository/src/test/resources/json/classification-association/header-Tx-prop-T1.json @@ -0,0 +1,42 @@ +{ + "guidHeaderMap": { + "0ce68113-77fe-4ed1-9585-69371202bd74": { + "typeName": "hive_column", + "attributes": { + "owner": "hive", + "qualifiedName": "hortoniabank.us_customers.nationalid@cl1", + "name": "nationalid" + }, + "guid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "status": "ACTIVE", + "displayText": "nationalid", + "classificationNames": [ + "T1", "Tx" + ], + "classifications": [ + { + "typeName": "Tx", + "attributes": { + "type": "ssn" + }, + "entityGuid": "0ce68113-77fe-4ed1-9585-69371202bd74", + "entityStatus": "ACTIVE", + "propagate": true, + "removePropagationsOnEntityDelete": false + }, + { + "typeName": "T1", + "attributes": { + "type": "ssn" + }, + "entityGuid": "22222222-77fe-4ed1-9585-69371202bd74", + "entityStatus": "ACTIVE", + "propagate": true, + "removePropagationsOnEntityDelete": false + } + ], + "meaningNames": [], + "meanings": [] + } + } +} diff --git a/repository/src/test/resources/json/classification-association/header-Tx.json b/repository/src/test/resources/json/classification-association/header-Tx.json new file mode 100644 index 0000000..bab1eaa --- /dev/null +++ b/repository/src/test/resources/json/classification-association/header-Tx.json @@ -0,0 +1,26 @@ +{ + "typeName": "hive_table", + "attributes": { + "owner": "hive", + "createTime": 1547071410000, + "qualifiedName": "stocks.daily@cl1", + "name": "daily" + }, + "guid": "df122fc3-5555-40f8-a30f-3090b8a622f8", + "status": "ACTIVE", + "displayText": "daily", + "classificationNames": [], + "classifications": [ + { + "typeName": "Tx", + "attributes": {}, + "entityGuid": "df122fc3-5555-40f8-a30f-3090b8a622f8", + "entityStatus": "ACTIVE", + "propagate": false, + "validityPeriods": [], + "removePropagationsOnEntityDelete": false + } + ], + "meaningNames": [], + "meanings": [] +} \ No newline at end of file diff --git a/repository/src/test/resources/json/classification-association/header-empty.json b/repository/src/test/resources/json/classification-association/header-empty.json new file mode 100644 index 0000000..9c8f417 --- /dev/null +++ b/repository/src/test/resources/json/classification-association/header-empty.json @@ -0,0 +1,3 @@ +{ + "guidHeaderMap": {} +} \ No newline at end of file diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java index 68c132c..713338e 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/EntityREST.java @@ -26,10 +26,12 @@ import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.AtlasEntityHeaders; import org.apache.atlas.model.instance.ClassificationAssociateRequest; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.repository.audit.EntityAuditRepository; +import org.apache.atlas.repository.store.graph.v2.ClassificationAssociator; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; @@ -49,7 +51,16 @@ import org.springframework.stereotype.Service; import javax.inject.Inject; import javax.inject.Singleton; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import java.util.ArrayList; @@ -77,7 +88,6 @@ public class EntityREST { private final EntityAuditRepository auditRepository; private final AtlasInstanceConverter instanceConverter; - @Inject public EntityREST(AtlasTypeRegistry typeRegistry, AtlasEntityStore entitiesStore, EntityAuditRepository auditRepository, AtlasInstanceConverter instanceConverter) { @@ -601,7 +611,7 @@ public class EntityREST { } /** - * Bulk API to create new entities or update existing entities in Atlas. + * Bulk API to create new entities or updates existing entities in Atlas. * Existing entity is matched using its unique guid if supplied or by its unique attributes eg: qualifiedName */ @POST @@ -708,6 +718,49 @@ public class EntityREST { } } + @GET + @Path("bulk/headers") + @Produces(Servlets.JSON_MEDIA_TYPE) + public AtlasEntityHeaders getEntityHeaders(@QueryParam("tagUpdateStartTime") long tagUpdateStartTime) throws AtlasBaseException { + AtlasPerfTracer perf = null; + + try { + long tagUpdateEndTime = System.currentTimeMillis(); + + if (tagUpdateStartTime > tagUpdateEndTime) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "fromTimestamp should be less than toTimestamp"); + } + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.getEntityHeaders(" + tagUpdateStartTime + ", " + tagUpdateEndTime + ")"); + } + + ClassificationAssociator.Retriever associator = new ClassificationAssociator.Retriever(typeRegistry, auditRepository); + return associator.get(tagUpdateStartTime, tagUpdateEndTime); + } finally { + AtlasPerfTracer.log(perf); + } + } + + @POST + @Path("bulk/setClassifications") + @Produces(Servlets.JSON_MEDIA_TYPE) + @Consumes(Servlets.JSON_MEDIA_TYPE) + public String setClassifications(AtlasEntityHeaders entityHeaders) throws AtlasBaseException { + AtlasPerfTracer perf = null; + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityREST.setClassifications()"); + } + + ClassificationAssociator.Updater associator = new ClassificationAssociator.Updater(typeRegistry, entitiesStore); + return associator.setClassifications(entityHeaders.getGuidHeaderMap()); + } finally { + AtlasPerfTracer.log(perf); + } + } + private AtlasEntityType ensureEntityType(String typeName) throws AtlasBaseException { AtlasEntityType ret = typeRegistry.getEntityTypeByName(typeName);