Repository: atlas Updated Branches: refs/heads/branch-0.8 2a547434b -> 8f99ffedf
ATLAS-2934: utility to detect and repair incorrect entity state Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/8f99ffed Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/8f99ffed Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/8f99ffed Branch: refs/heads/branch-0.8 Commit: 8f99ffedfb9f8b87b4142167cb9e26ebb13f232c Parents: 2a54743 Author: Madhan Neethiraj <mad...@apache.org> Authored: Tue Oct 23 09:16:26 2018 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Wed Oct 24 16:01:28 2018 -0700 ---------------------------------------------------------------------- .../model/instance/AtlasCheckStateRequest.java | 100 ++++++ .../model/instance/AtlasCheckStateResult.java | 257 +++++++++++++++ .../store/graph/AtlasEntityStore.java | 10 + .../store/graph/v1/AtlasEntityStoreV1.java | 26 ++ .../store/graph/v1/EntityGraphRetriever.java | 59 ++-- .../store/graph/v1/EntityStateChecker.java | 323 +++++++++++++++++++ .../atlas/web/resources/AdminResource.java | 62 ++-- .../atlas/web/resources/AdminResourceTest.java | 4 +- 8 files changed, 802 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/8f99ffed/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateRequest.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateRequest.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateRequest.java new file mode 100644 index 0000000..3184746 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateRequest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.instance; + +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.Set; + +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY; + + +/** + * Request to run state-check of entities + */ +@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class AtlasCheckStateRequest implements Serializable { + private static final long serialVersionUID = 1L; + + private Set<String> entityGuids; + private Set<String> entityTypes; + private boolean fixIssues; + + + public AtlasCheckStateRequest() { + } + + public Set<String> getEntityGuids() { + return entityGuids; + } + + public void setEntityGuids(Set<String> entityGuids) { + this.entityGuids = entityGuids; + } + + public Set<String> getEntityTypes() { + return entityTypes; + } + + public void setEntityTypes(Set<String> entityTypes) { + this.entityTypes = entityTypes; + } + + public boolean getFixIssues() { + return fixIssues; + } + + public void setFixIssues(boolean fixIssues) { + this.fixIssues = fixIssues; + } + + + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append("AtlasCheckStateRequest{"); + sb.append("entityGuids=["); + AtlasBaseTypeDef.dumpObjects(entityGuids, sb); + sb.append("], entityTypes=["); + AtlasBaseTypeDef.dumpObjects(entityTypes, sb); + sb.append("]"); + sb.append(", fixIssues=").append(fixIssues); + sb.append("}"); + + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/8f99ffed/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateResult.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateResult.java new file mode 100644 index 0000000..503225a --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateResult.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.instance; + +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.codehaus.jackson.map.annotate.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.NONE; +import static org.codehaus.jackson.annotate.JsonAutoDetect.Visibility.PUBLIC_ONLY; + + +/** + * Result of Atlas state check run. + */ +@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class AtlasCheckStateResult implements Serializable { + private static final long serialVersionUID = 1L; + + public enum State { OK, FIXED, PARTIALLY_FIXED, NOT_FIXED} + + private int entitiesScanned = 0; + private int entitiesOk = 0; + private int entitiesFixed = 0; + private int entitiesPartiallyFixed = 0; + private int entitiesNotFixed = 0; + private State state = State.OK; + private Map<String, AtlasEntityState> entities = null; + + + public AtlasCheckStateResult() { + } + + public int getEntitiesScanned() { + return entitiesScanned; + } + + public void setEntitiesScanned(int entitiesScanned) { + this.entitiesScanned = entitiesScanned; + } + + public void incrEntitiesScanned() { entitiesScanned++; } + + public int getEntitiesOk() { + return entitiesOk; + } + + public void setEntitiesOk(int entitiesOk) { + this.entitiesOk = entitiesOk; + } + + public void incrEntitiesOk() { entitiesOk++; } + + public int getEntitiesFixed() { + return entitiesFixed; + } + + public void setEntitiesFixed(int entitiesFixed) { + this.entitiesFixed = entitiesFixed; + } + + public void incrEntitiesFixed() { entitiesFixed++; } + + public int getEntitiesPartiallyFixed() { + return entitiesPartiallyFixed; + } + + public void setEntitiesPartiallyFixed(int entitiesPartiallyFixed) { + this.entitiesPartiallyFixed = entitiesPartiallyFixed; + } + + public void incrEntitiesPartiallyFixed() { entitiesPartiallyFixed++; } + + public int getEntitiesNotFixed() { + return entitiesNotFixed; + } + + public void setEntitiesNotFixed(int entitiesNotFixed) { + this.entitiesNotFixed = entitiesNotFixed; + } + + public void incrEntitiesNotFixed() { entitiesNotFixed++; } + + public State getState() { + return state; + } + + public void setState(State state) { + this.state = state; + } + + public Map<String, AtlasEntityState> getEntities() { + return entities; + } + + public void setEntities(Map<String, AtlasEntityState> entities) { + this.entities = entities; + } + + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append("AtlasCheckStateResult{"); + sb.append("entitiesScanned='").append(entitiesScanned); + sb.append(", entitiesFixed=").append(entitiesFixed); + sb.append(", entitiesPartiallyFixed=").append(entitiesPartiallyFixed); + sb.append(", entitiesNotFixed=").append(entitiesNotFixed); + sb.append(", state=").append(state); + + sb.append("entities=["); + if (entities != null) { + boolean isFirst = true; + for (Map.Entry<String, AtlasEntityState> entry : entities.entrySet()) { + if (isFirst) { + isFirst = false; + } else { + sb.append(","); + } + + sb.append(entry.getKey()).append(":"); + entry.getValue().toString(sb); + } + } + sb.append("]"); + + sb.append("}"); + + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); + } + + @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) + @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) + @JsonIgnoreProperties(ignoreUnknown=true) + @XmlRootElement + @XmlAccessorType(XmlAccessType.PROPERTY) + public static class AtlasEntityState implements Serializable { + private static final long serialVersionUID = 1L; + + private String guid; + private String typeName; + private String name; + private AtlasEntity.Status status; + private State state = State.OK; + private List<String> issues; + + + public AtlasEntityState() { + } + + public String getGuid() { + return guid; + } + + public void setGuid(String guid) { + this.guid = guid; + } + + public String getTypeName() { + return typeName; + } + + public void setTypeName(String typeName) { + this.typeName = typeName; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public AtlasEntity.Status getStatus() { + return status; + } + + public void setStatus(AtlasEntity.Status status) { + this.status = status; + } + + + public State getState() { + return state; + } + + public void setState(State state) { + this.state = state; + } + + public List<String> getIssues() { + return issues; + } + + public void setIssues(List<String> issues) { + this.issues = issues; + } + + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append("AtlasEntityState{"); + sb.append("guid=").append(guid); + sb.append(", typeName=").append(typeName); + sb.append(", name=").append(name); + sb.append(", status=").append(status); + sb.append(", state=").append(state); + sb.append(", issues=["); + AtlasBaseTypeDef.dumpObjects(issues, sb); + sb.append("]"); + sb.append("}"); + + return sb; + } + + @Override + public String toString() { + return toString(new StringBuilder()).toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/8f99ffed/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java index 6a1a88f..fc84421 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java @@ -18,6 +18,8 @@ package org.apache.atlas.repository.store.graph; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasCheckStateRequest; +import org.apache.atlas.model.instance.AtlasCheckStateResult; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; @@ -98,6 +100,14 @@ public interface AtlasEntityStore { throws AtlasBaseException; /** + * Check state of entities in the store + * @param request AtlasCheckStateRequest + * @return AtlasCheckStateResult + * @throws AtlasBaseException + */ + AtlasCheckStateResult checkState(AtlasCheckStateRequest request) throws AtlasBaseException; + + /** * Create or update entities in the stream * @param entityStream AtlasEntityStream * @return EntityMutationResponse Entity mutations operations with the corresponding set of entities on which these operations were performed http://git-wip-us.apache.org/repos/asf/atlas/blob/8f99ffed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index 449a958..5df9295 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -23,6 +23,8 @@ import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContextV1; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasCheckStateRequest; +import org.apache.atlas.model.instance.AtlasCheckStateResult; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; @@ -183,6 +185,30 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { return ret; } + /** + * Check state of entities in the store + * @param request AtlasCheckStateRequest + * @return AtlasCheckStateResult + * @throws AtlasBaseException + */ + @Override + @GraphTransaction + public AtlasCheckStateResult checkState(AtlasCheckStateRequest request) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> checkState({})", request); + } + + EntityStateChecker entityStateChecker = new EntityStateChecker(typeRegistry); + + AtlasCheckStateResult ret = entityStateChecker.checkState(request); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== checkState({}, {})", request, ret); + } + + return ret; + } + private EntityMutationResponse createOrUpdate(EntityStream entityStream, boolean isPartialUpdate, boolean replaceClassifications) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("==> createOrUpdate()"); http://git-wip-us.apache.org/repos/asf/atlas/blob/8f99ffed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java index 0445e27..549e595 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java @@ -42,6 +42,7 @@ import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,6 +159,28 @@ public final class EntityGraphRetriever { return ret; } + public Map<String, Object> getEntityUniqueAttribute(AtlasVertex entityVertex) throws AtlasBaseException { + Map<String, Object> ret = null; + String typeName = AtlasGraphUtilsV1.getTypeName(entityVertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + if (entityType != null && MapUtils.isNotEmpty(entityType.getUniqAttributes())) { + for (AtlasAttribute attribute : entityType.getUniqAttributes().values()) { + Object val = mapVertexToAttribute(entityVertex, attribute, null, false); + + if (val != null) { + if (ret == null) { + ret = new HashMap<>(); + } + + ret.put(attribute.getName(), val); + } + } + } + + return ret; + } + private AtlasVertex getEntityVertex(AtlasObjectId objId) throws AtlasBaseException { AtlasVertex ret = null; @@ -377,6 +400,24 @@ public final class EntityGraphRetriever { return classifications.get(0); } + public AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName) throws AtlasBaseException { + + AtlasClassification ret = null; + if (LOG.isDebugEnabled()) { + LOG.debug("mapping classification {} to atlas entity", classificationName); + } + + Iterable<AtlasEdge> edges = instanceVertex.getEdges(AtlasEdgeDirection.OUT, classificationName); + AtlasEdge edge = (edges != null && edges.iterator().hasNext()) ? edges.iterator().next() : null; + + if (edge != null) { + ret = new AtlasClassification(classificationName); + mapAttributes(edge.getInVertex(), ret, null); + } + + return ret; + } + private List<AtlasClassification> getClassifications(AtlasVertex instanceVertex, String classificationNameFilter) throws AtlasBaseException { List<AtlasClassification> classifications = new ArrayList<>(); @@ -406,24 +447,6 @@ public final class EntityGraphRetriever { return classifications; } - private AtlasClassification getClassification(AtlasVertex instanceVertex, String classificationName) throws AtlasBaseException { - - AtlasClassification ret = null; - if (LOG.isDebugEnabled()) { - LOG.debug("mapping classification {} to atlas entity", classificationName); - } - - Iterable<AtlasEdge> edges = instanceVertex.getEdges(AtlasEdgeDirection.OUT, classificationName); - AtlasEdge edge = (edges != null && edges.iterator().hasNext()) ? edges.iterator().next() : null; - - if (edge != null) { - ret = new AtlasClassification(classificationName); - mapAttributes(edge.getInVertex(), ret, null); - } - - return ret; - } - private void mapClassifications(AtlasVertex entityVertex, AtlasEntity entity) throws AtlasBaseException { final List<AtlasClassification> classifications = getClassifications(entityVertex, null); entity.setClassifications(classifications); http://git-wip-us.apache.org/repos/asf/atlas/blob/8f99ffed/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStateChecker.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStateChecker.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStateChecker.java new file mode 100644 index 0000000..3f87f56 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityStateChecker.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v1; + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContextV1; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasCheckStateRequest; +import org.apache.atlas.model.instance.AtlasCheckStateResult; +import org.apache.atlas.model.instance.AtlasCheckStateResult.AtlasEntityState; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasGraphQuery; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +@Component +public final class EntityStateChecker { + private static final Logger LOG = LoggerFactory.getLogger(EntityStateChecker.class); + + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityRetriever; + + @Inject + public EntityStateChecker(AtlasTypeRegistry typeRegistry) { + this.typeRegistry = typeRegistry; + this.entityRetriever = new EntityGraphRetriever(typeRegistry); + } + + + public AtlasCheckStateResult checkState(AtlasCheckStateRequest request) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> checkState({})", request); + } + + AtlasCheckStateResult ret = new AtlasCheckStateResult(); + + if (request != null) { + if (CollectionUtils.isNotEmpty(request.getEntityGuids())) { + for (String guid : request.getEntityGuids()) { + checkEntityState(guid, request.getFixIssues(), ret); + } + } else if (CollectionUtils.isNotEmpty(request.getEntityTypes())) { + final Collection<String> entityTypes; + + if (request.getEntityTypes().contains("*")) { + entityTypes = typeRegistry.getAllEntityDefNames(); + } else { + entityTypes = request.getEntityTypes(); + } + + LOG.info("checkState(): scanning for entities of {} types", entityTypes.size()); + + for (String typeName : entityTypes) { + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(typeName); + + if (entityType == null) { + LOG.warn("checkState(): {} - entity-type not found", typeName); + + continue; + } + + LOG.info("checkState(): scanning for {} entities", typeName); + + AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName); + + int count = 0; + for (Iterator<AtlasVertex> iter = query.vertices().iterator(); iter.hasNext(); count++) { + checkEntityState(iter.next(), request.getFixIssues(), ret); + } + + LOG.info("checkState(): scanned {} {} entities", count, typeName); + } + } + + int incorrectFixed = ret.getEntitiesFixed(); + int incorrectPartiallyFixed = ret.getEntitiesPartiallyFixed(); + int incorrectNotFixed = ret.getEntitiesNotFixed(); + + if (incorrectFixed == 0 && incorrectPartiallyFixed == 0 && incorrectNotFixed == 0) { + ret.setState(AtlasCheckStateResult.State.OK); + } else if (incorrectPartiallyFixed != 0) { + ret.setState(AtlasCheckStateResult.State.PARTIALLY_FIXED); + } else if (incorrectNotFixed != 0) { + ret.setState(incorrectFixed > 0 ? AtlasCheckStateResult.State.PARTIALLY_FIXED : AtlasCheckStateResult.State.NOT_FIXED); + } else { + ret.setState(AtlasCheckStateResult.State.FIXED); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== checkState({}, {})", request, ret); + } + + return ret; + } + + + /** + * Check an entity state given its GUID + * @param guid + * @return + * @throws AtlasBaseException + */ + public AtlasEntityState checkEntityState(String guid, boolean fixIssues, AtlasCheckStateResult result) throws AtlasBaseException { + AtlasVertex entityVertex = AtlasGraphUtilsV1.findByGuid(guid); + + if (entityVertex == null) { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid); + } + + return checkEntityState(entityVertex, fixIssues, result); + } + + /** + * Check an entity state given its vertex + * @param entityVertex + * @return + * @throws AtlasBaseException + */ + public AtlasEntityState checkEntityState(AtlasVertex entityVertex, boolean fixIssues, AtlasCheckStateResult result) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> checkEntityState(guid={})", AtlasGraphUtilsV1.getIdFromVertex(entityVertex)); + } + + AtlasEntityState ret = new AtlasEntityState(); + + ret.setGuid(AtlasGraphUtilsV1.getIdFromVertex(entityVertex)); + ret.setTypeName(AtlasGraphUtilsV1.getTypeName(entityVertex)); + ret.setName(getEntityName(entityVertex)); + ret.setStatus(AtlasGraphUtilsV1.getState(entityVertex)); + ret.setState(AtlasCheckStateResult.State.OK); + + checkEntityState_Classifications(entityVertex, ret, fixIssues); + + if (ret.getState() != AtlasCheckStateResult.State.OK) { // don't include clean entities in the response + if (result.getEntities() == null) { + result.setEntities(new HashMap<String, AtlasEntityState>()); + } + + result.getEntities().put(ret.getGuid(), ret); + } + + result.incrEntitiesScanned(); + + switch (ret.getState()) { + case FIXED: + result.incrEntitiesFixed(); + break; + + case PARTIALLY_FIXED: + result.incrEntitiesPartiallyFixed(); + break; + + case NOT_FIXED: + result.incrEntitiesNotFixed(); + break; + + case OK: + result.incrEntitiesOk(); + break; + } + + LOG.info("checkEntityState(guid={}; type={}; name={}): {}", ret.getGuid(), ret.getTypeName(), ret.getName(), ret.getState()); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== checkEntityState({}): {}", ret.getGuid(), ret); + } + + return ret; + } + + private void checkEntityState_Classifications(AtlasVertex entityVertex, AtlasEntityState result, boolean fixIssues) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> checkEntityState_Classifications({}, {})", result, fixIssues); + } + + List<String> traitNames = GraphHelper.getTraitNames(entityVertex); + List<String> traitVertexNames = null; + Iterable<AtlasEdge> edges = entityVertex.getEdges(AtlasEdgeDirection.OUT); + + if (edges != null) { + for (Iterator<AtlasEdge> iter = edges.iterator(); iter.hasNext(); ) { + AtlasEdge edge = iter.next(); + String edgeLabel = edge.getLabel(); + String classificationName = GraphHelper.getTypeName(edge.getInVertex()); + + // classification edge name is same as the name of the classification + if (StringUtils.equals(edgeLabel, classificationName)) { + AtlasClassificationType classification = typeRegistry.getClassificationTypeByName(classificationName); + + if (classification != null) { + if (traitVertexNames == null) { + traitVertexNames = new ArrayList<>(); + } + + traitVertexNames.add(classificationName); + } + } + } + } + + List<String> traitNamesToRemove = null; + List<String> traitNamesToAdd = null; + + if (traitNames != null) { + for (String traitName : traitNames) { + if (traitVertexNames == null || !traitVertexNames.contains(traitName)) { + if (traitNamesToRemove == null) { + traitNamesToRemove = new ArrayList<>(); + } + + traitNamesToRemove.add(traitName); + } + } + } + + if (traitVertexNames != null) { + for (String traitVertexName : traitVertexNames) { + if (traitNames == null || !traitNames.contains(traitVertexName)) { + if (traitNamesToAdd == null) { + traitNamesToAdd = new ArrayList<>(); + } + + traitNamesToAdd.add(traitVertexName); + } + } + } + + if (traitNamesToAdd != null || traitNamesToRemove != null) { + List<String> issues = result.getIssues(); + + if (issues == null) { + issues = new ArrayList<>(); + } + + if (traitNamesToAdd != null) { + issues.add("incorrect property: __traitNames has missing classifications: " + traitNamesToAdd.toString()); + } + + if (traitNamesToRemove != null) { + issues.add("incorrect property: __traitNames has unassigned classifications: " + traitNamesToRemove.toString()); + } + + if (fixIssues) { + entityVertex.removeProperty(Constants.TRAIT_NAMES_PROPERTY_KEY); + + for (String classificationName : traitVertexNames) { + AtlasGraphUtilsV1.addEncodedProperty(entityVertex, Constants.TRAIT_NAMES_PROPERTY_KEY, classificationName); + } + + AtlasGraphUtilsV1.setEncodedProperty(entityVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContextV1.get().getRequestTime()); + + result.setState(AtlasCheckStateResult.State.FIXED); + } else { + result.setState(AtlasCheckStateResult.State.NOT_FIXED); + } + + result.setIssues(issues); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== checkEntityState_Classifications({}, {})", result, fixIssues); + } + } + + private String getEntityName(AtlasVertex entityVertex) throws AtlasBaseException { + String ret = null; + Map<String, Object> uniqueAttributes = entityRetriever.getEntityUniqueAttribute(entityVertex); + + if (uniqueAttributes != null) { + Object val = uniqueAttributes.get("qualifiedName"); + + if (val == null) { + for (Object attrVal : uniqueAttributes.values()) { + if (attrVal != null) { + ret = attrVal.toString(); + + break; + } + } + } else { + ret = val.toString(); + } + } + + return ret; + } +} + http://git-wip-us.apache.org/repos/asf/atlas/blob/8f99ffed/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index b53ac69..9069f44 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -34,6 +34,8 @@ import org.apache.atlas.model.impexp.AtlasExportResult; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.impexp.ExportImportAuditEntry; +import org.apache.atlas.model.instance.AtlasCheckStateRequest; +import org.apache.atlas.model.instance.AtlasCheckStateResult; import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.repository.impexp.AtlasServerService; import org.apache.atlas.repository.impexp.ExportImportAuditService; @@ -41,6 +43,7 @@ import org.apache.atlas.repository.impexp.ExportService; import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.ZipSink; import org.apache.atlas.repository.impexp.ZipSource; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.services.MetricsService; import org.apache.atlas.type.AtlasType; import org.apache.atlas.util.SearchTracker; @@ -116,14 +119,15 @@ public class AdminResource { private static final String DEFAULT_EDITABLE_ENTITY_TYPES = "hdfs_path,hbase_table,hbase_column,hbase_column_family,kafka_topic"; private Response version; - private final ServiceState serviceState; - private final MetricsService metricsService; - private static Configuration atlasProperties; - private final ExportService exportService; - private final ImportService importService; - private final SearchTracker activeSearches; - private AtlasServerService atlasServerService; - private ExportImportAuditService exportImportAuditService; + private final ServiceState serviceState; + private final MetricsService metricsService; + private static Configuration atlasProperties; + private final ExportService exportService; + private final ImportService importService; + private final SearchTracker activeSearches; + private final AtlasServerService atlasServerService; + private final ExportImportAuditService exportImportAuditService; + private final AtlasEntityStore entityStore; static { try { @@ -134,17 +138,17 @@ public class AdminResource { } @Inject - public AdminResource(ServiceState serviceState, MetricsService metricsService, - ExportService exportService, ImportService importService, SearchTracker activeSearches, - AtlasServerService serverService, - ExportImportAuditService exportImportAuditService) { - this.serviceState = serviceState; - this.metricsService = metricsService; - this.exportService = exportService; - this.importService = importService; - this.activeSearches = activeSearches; - this.atlasServerService = serverService; - this.exportImportAuditService = exportImportAuditService; + public AdminResource(ServiceState serviceState, MetricsService metricsService, ExportService exportService, + ImportService importService, SearchTracker activeSearches, AtlasServerService serverService, + ExportImportAuditService exportImportAuditService, AtlasEntityStore entityStore) { + this.serviceState = serviceState; + this.metricsService = metricsService; + this.exportService = exportService; + this.importService = importService; + this.activeSearches = activeSearches; + this.atlasServerService = serverService; + this.exportImportAuditService = exportImportAuditService; + this.entityStore = entityStore; this.importExportOperationLock = new ReentrantLock(); } @@ -519,6 +523,26 @@ public class AdminResource { return null != terminate; } + @POST + @Path("checkstate") + @Produces(Servlets.JSON_MEDIA_TYPE) + @Consumes(Servlets.JSON_MEDIA_TYPE) + public AtlasCheckStateResult checkState(AtlasCheckStateRequest request) throws AtlasBaseException { + AtlasPerfTracer perf = null; + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "checkState(" + request + ")"); + } + + AtlasCheckStateResult ret = entityStore.checkState(request); + + return ret; + } finally { + AtlasPerfTracer.log(perf); + } + } + private String getEditableEntityTypes(Configuration config) { String ret = DEFAULT_EDITABLE_ENTITY_TYPES; http://git-wip-us.apache.org/repos/asf/atlas/blob/8f99ffed/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java index 2f4b3d9..bdfe9e0 100644 --- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java @@ -48,7 +48,7 @@ public class AdminResourceTest { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null); Response response = adminResource.getStatus(); assertEquals(response.getStatus(), HttpServletResponse.SC_OK); JSONObject entity = (JSONObject) response.getEntity(); @@ -59,7 +59,7 @@ public class AdminResourceTest { public void testResourceGetsValueFromServiceState() throws JSONException { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null, null, null); Response response = adminResource.getStatus(); verify(serviceState).getState();