This is an automated email from the ASF dual-hosted git repository. kbhatt pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 52bc5d7cf1bab8c6937e7c5d51e5c5bd1545cba8 Author: Mandar Ambawane <[email protected]> AuthorDate: Fri Aug 14 18:26:53 2020 +0530 ATLAS-3583 Use Audit framework to generate audit entries for TypeDefs CREATE, UPDATE and DELETE Signed-off-by: nixonrodrigues <[email protected]> (cherry picked from commit 892df24289d583028e74e78f7b86f6fde785e909) --- addons/models/0000-Area0/0010-base_model.json | 12 +++ ...-base_model_add_atlas_operation_attributes.json | 27 +++++ .../apache/atlas/model/audit/AtlasAuditEntry.java | 5 +- .../atlas/repository/audit/AtlasAuditService.java | 2 +- .../repository/audit/TypeDefAuditListener.java | 112 +++++++++++++++++++++ .../bootstrap/AtlasTypeDefStoreInitializer.java | 49 ++++++++- .../store/graph/v2/AtlasEntityChangeNotifier.java | 12 +++ .../resources/solr/core-template/solrconfig.xml | 2 +- 8 files changed, 216 insertions(+), 5 deletions(-) diff --git a/addons/models/0000-Area0/0010-base_model.json b/addons/models/0000-Area0/0010-base_model.json index 9777b7a..1de9e57 100644 --- a/addons/models/0000-Area0/0010-base_model.json +++ b/addons/models/0000-Area0/0010-base_model.json @@ -25,6 +25,18 @@ { "ordinal": 4, "value": "IMPORT_DELETE_REPL" + }, + { + "ordinal": 5, + "value": "TYPE_DEF_CREATE" + }, + { + "ordinal": 6, + "value": "TYPE_DEF_UPDATE" + }, + { + "ordinal": 7, + "value": "TYPE_DEF_DELETE" } ] } diff --git a/addons/models/0000-Area0/patches/006-base_model_add_atlas_operation_attributes.json b/addons/models/0000-Area0/patches/006-base_model_add_atlas_operation_attributes.json new file mode 100644 index 0000000..a27525e --- /dev/null +++ b/addons/models/0000-Area0/patches/006-base_model_add_atlas_operation_attributes.json @@ -0,0 +1,27 @@ +{ + "patches": [ + { + "id": "TYPEDEF_PATCH_0006_001", + "description": "Add additional operations in Atlas", + "action": "UPDATE_ENUMDEF", + "typeName": "atlas_operation", + "applyToVersion": "1.0", + "updateToVersion": "1.1", + "params": null, + "elementDefs": [ + { + "ordinal": 5, + "value": "TYPE_DEF_CREATE" + }, + { + "ordinal": 6, + "value": "TYPE_DEF_UPDATE" + }, + { + "ordinal": 7, + "value": "TYPE_DEF_DELETE" + } + ] + } + ] +} diff --git a/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java b/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java index a95cf4e..9ed4168 100644 --- a/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java +++ b/intg/src/main/java/org/apache/atlas/model/audit/AtlasAuditEntry.java @@ -40,7 +40,10 @@ public class AtlasAuditEntry extends AtlasBaseModelObject implements Serializabl PURGE("PURGE"), EXPORT("EXPORT"), IMPORT("IMPORT"), - IMPORT_DELETE_REPL("IMPORT_DELETE_REPL"); + IMPORT_DELETE_REPL("IMPORT_DELETE_REPL"), + TYPE_DEF_CREATE("TYPE_DEF_CREATE"), + TYPE_DEF_UPDATE("TYPE_DEF_UPDATE"), + TYPE_DEF_DELETE("TYPE_DEF_DELETE"); private final String type; diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditService.java b/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditService.java index a0dc816..d843204 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditService.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/AtlasAuditService.java @@ -46,7 +46,7 @@ import java.util.Set; @AtlasService public class AtlasAuditService { private static final Logger LOG = LoggerFactory.getLogger(AtlasAuditService.class); - private static final String ENTITY_TYPE_AUDIT_ENTRY = "__AtlasAuditEntry"; + public static final String ENTITY_TYPE_AUDIT_ENTRY = "__AtlasAuditEntry"; private final DataAccess dataAccess; private final AtlasDiscoveryService discoveryService; diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/TypeDefAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/TypeDefAuditListener.java new file mode 100644 index 0000000..bfc300e --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/audit/TypeDefAuditListener.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit; + +import org.apache.atlas.RequestContext; +import org.apache.atlas.authorize.AtlasAuthorizationUtils; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.ChangedTypeDefs; +import org.apache.atlas.listener.TypeDefChangeListener; +import org.apache.atlas.model.TypeCategory; +import org.apache.atlas.model.audit.AtlasAuditEntry; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.apache.atlas.utils.AtlasJson; +import org.apache.commons.collections.CollectionUtils; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +@Component +@Order(2) +public class TypeDefAuditListener implements TypeDefChangeListener { + + AtlasAuditService auditService; + + @Inject + TypeDefAuditListener(AtlasAuditService auditService) { + this.auditService = auditService; + } + + @Override + public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { + createAuditEntry(changedTypeDefs); + } + + @Override + public void onLoadCompletion() throws AtlasBaseException { + } + + private void createAuditEntry(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { + List<AtlasBaseTypeDef> createdTypes = (List<AtlasBaseTypeDef>) changedTypeDefs.getCreatedTypeDefs(); + List<AtlasBaseTypeDef> updatedTypes = (List<AtlasBaseTypeDef>) changedTypeDefs.getUpdatedTypeDefs(); + List<AtlasBaseTypeDef> deletedTypes = (List<AtlasBaseTypeDef>) changedTypeDefs.getDeletedTypeDefs(); + + updatedTypes = removeDuplicateEntries(createdTypes, updatedTypes); + + createAuditEntry(createdTypes, AtlasAuditEntry.AuditOperation.TYPE_DEF_CREATE); + createAuditEntry(updatedTypes, AtlasAuditEntry.AuditOperation.TYPE_DEF_UPDATE); + createAuditEntry(deletedTypes, AtlasAuditEntry.AuditOperation.TYPE_DEF_DELETE); + } + + private List<AtlasBaseTypeDef> removeDuplicateEntries(List<AtlasBaseTypeDef> createdTypes, List<AtlasBaseTypeDef> updatedTypes) { + if (CollectionUtils.isNotEmpty(createdTypes)) { + List<String> createdTypeNames = createdTypes.stream() + .map(obj -> obj.getName()).collect(Collectors.toList()); + updatedTypes.removeIf(obj -> createdTypeNames.contains(obj.getName())); + } + if (CollectionUtils.isNotEmpty(updatedTypes)) { + Set<AtlasBaseTypeDef> baseTypeDefs = updatedTypes.stream() + .collect(Collectors.toCollection(() -> + new TreeSet<>(Comparator.comparing(AtlasBaseTypeDef::getName)))); + updatedTypes = new ArrayList<>(baseTypeDefs); + } + return updatedTypes; + } + + private void createAuditEntry(List<AtlasBaseTypeDef> baseTypeDefList, AtlasAuditEntry.AuditOperation auditOperation) throws AtlasBaseException { + if (CollectionUtils.isEmpty(baseTypeDefList)) { + return; + } + final String clientIp = RequestContext.get().getClientIPAddress(); + final Date startTime = new Date(RequestContext.get().getRequestTime()); + final Date endTime = new Date(); + + Map<TypeCategory, List<AtlasBaseTypeDef>> groupByCategoryMap = + baseTypeDefList.stream().collect(Collectors.groupingBy(AtlasBaseTypeDef::getCategory)); + + List<String> categories = new ArrayList<>(); + for (TypeCategory category : groupByCategoryMap.keySet()) { + categories.add(category.name()); + } + + String typeDefJson = AtlasJson.toJson(groupByCategoryMap); + + auditService.add(RequestContext.get().getUser() == null ? "" : RequestContext.get().getUser(), auditOperation, + clientIp != null ? clientIp : "", startTime, endTime, String.join(",", categories), + typeDefJson, baseTypeDefList.size()); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index 8e7c1b3..c29888f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -443,6 +443,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { Arrays.sort(typePatchFiles); PatchHandler[] patchHandlers = new PatchHandler[] { + new UpdateEnumDefPatchHandler(typeDefStore, typeRegistry), new AddAttributePatchHandler(typeDefStore, typeRegistry), new UpdateAttributePatchHandler(typeDefStore, typeRegistry), new RemoveLegacyRefAttributesPatchHandler(typeDefStore, typeRegistry), @@ -527,6 +528,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private String updateToVersion; private Map<String, Object> params; private List<AtlasAttributeDef> attributeDefs; + private List<AtlasEnumElementDef> elementDefs; private Map<String, String> typeDefOptions; private String serviceType; private String attributeName; @@ -595,6 +597,14 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { this.attributeDefs = attributeDefs; } + public List<AtlasEnumElementDef> getElementDefs() { + return elementDefs; + } + + public void setElementDefs(List<AtlasEnumElementDef> elementDefs) { + this.elementDefs = elementDefs; + } + public Map<String, String> getTypeDefOptions() { return typeDefOptions; } @@ -661,13 +671,48 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { } } + class UpdateEnumDefPatchHandler extends PatchHandler { + public UpdateEnumDefPatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) { + super(typeDefStore, typeRegistry, new String[]{"UPDATE_ENUMDEF"}); + } + + @Override + public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException { + String typeName = patch.getTypeName(); + AtlasBaseTypeDef typeDef = typeRegistry.getTypeDefByName(typeName); + PatchStatus ret; + if (typeDef == null) { + throw new AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), typeName); + } + if (isPatchApplicable(patch, typeDef)) { + if (typeDef.getClass().equals(AtlasEnumDef.class)) { + AtlasEnumDef updatedDef = new AtlasEnumDef((AtlasEnumDef) typeDef); + + for (AtlasEnumElementDef elementDef : patch.getElementDefs()) { + updatedDef.addElement(elementDef); + } + updatedDef.setTypeVersion(patch.getUpdateToVersion()); + typeDefStore.updateEnumDefByName(typeName, updatedDef); + ret = APPLIED; + } else { + throw new AtlasBaseException(AtlasErrorCode.PATCH_NOT_APPLICABLE_FOR_TYPE, patch.getAction(), typeDef.getClass().getSimpleName()); + } + } else { + LOG.info("patch skipped: typeName={}; applyToVersion={}; updateToVersion={}", + patch.getTypeName(), patch.getApplyToVersion(), patch.getUpdateToVersion()); + ret = SKIPPED; + } + return ret; + } + } + class AddAttributePatchHandler extends PatchHandler { public AddAttributePatchHandler(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry) { super(typeDefStore, typeRegistry, new String[] { "ADD_ATTRIBUTE" }); } - @Override - public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException { + @Override + public PatchStatus applyPatch(TypeDefPatch patch) throws AtlasBaseException { String typeName = patch.getTypeName(); AtlasBaseTypeDef typeDef = typeRegistry.getTypeDefByName(typeName); PatchStatus ret; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java index 0dc3193..32ad65e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityChangeNotifier.java @@ -35,6 +35,7 @@ import org.apache.atlas.model.instance.AtlasRelationship; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.EntityMutations.EntityOperation; import org.apache.atlas.model.notification.EntityNotification; +import org.apache.atlas.repository.audit.AtlasAuditService; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder; @@ -59,6 +60,8 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_ADD; import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV2.PROPAGATED_CLASSIFICATION_DELETE; @@ -402,10 +405,19 @@ public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier { return listener.getClass().getSimpleName(); } + private static final Predicate<AtlasEntityHeader> PRED_IS_NOT_TYPE_AUDIT_ENTITY = obj -> !obj.getTypeName().equals(AtlasAuditService.ENTITY_TYPE_AUDIT_ENTRY); + + private boolean skipAuditEntries(List<AtlasEntityHeader> entityHeaders) { + return CollectionUtils.isEmpty(entityHeaders) || !entityHeaders.stream().anyMatch(PRED_IS_NOT_TYPE_AUDIT_ENTITY); + } + private void notifyListeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { if (CollectionUtils.isEmpty(entityHeaders)) { return; } + if (skipAuditEntries(entityHeaders)) { + return; + } MetricRecorder metric = RequestContext.get().startMetricRecord("notifyListeners"); diff --git a/test-tools/src/main/resources/solr/core-template/solrconfig.xml b/test-tools/src/main/resources/solr/core-template/solrconfig.xml index 8ebbeff..7cbfbd9 100644 --- a/test-tools/src/main/resources/solr/core-template/solrconfig.xml +++ b/test-tools/src/main/resources/solr/core-template/solrconfig.xml @@ -434,7 +434,7 @@ --> <lst name="defaults"> <str name="defType">edismax</str> - <str name="qf">3hmt_t 35x_t f0l_t i6d_l 7f2d_t 7gn9_t 3oqt_s jr9_t 3rwl_t lc5_t mx1_t 7dhh_t iyt_l 3j7p_t 7klh_t 7hfp_t 7i85_t ohx_t 7bwl_l 7cp1_l</str> + <str name="qf">3k05_t 35x_t f0l_t i6d_l 7f2d_t 7gn9_t 3r45_s jr9_t 3u9x_t lc5_t mx1_t 7dhh_t iyt_l 3j7p_t 7klh_t 7hfp_t 7i85_t ohx_t 7bwl_l 7cp1_l</str> <str name="hl.fl">*</str> <bool name="hl.requireFieldMatch">true</bool> <bool name="lowercaseOperators">true</bool>
