ATLAS-2934: utility to detect and repair incorrect entity state

(cherry picked from commit 8f99ffedfb9f8b87b4142167cb9e26ebb13f232c)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/529863a1
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/529863a1
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/529863a1

Branch: refs/heads/branch-1.0
Commit: 529863a1012d1032ecabe1cefd451c1bccb2ed16
Parents: e0ef989
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Tue Oct 23 09:16:26 2018 -0700
Committer: Ashutosh Mestry <ames...@hortonworks.com>
Committed: Thu Nov 1 15:42:59 2018 -0700

----------------------------------------------------------------------
 .../model/instance/AtlasCheckStateRequest.java  | 100 ++++++
 .../model/instance/AtlasCheckStateResult.java   | 257 +++++++++++++
 .../store/graph/AtlasEntityStore.java           |  10 +
 .../store/graph/v2/AtlasEntityStoreV2.java      |  24 ++
 .../store/graph/v2/EntityGraphRetriever.java    |  22 ++
 .../store/graph/v2/EntityStateChecker.java      | 358 +++++++++++++++++++
 .../atlas/web/resources/AdminResource.java      |  47 ++-
 .../atlas/web/resources/AdminResourceTest.java  |   6 +-
 8 files changed, 810 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/529863a1/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateRequest.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateRequest.java
 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateRequest.java
new file mode 100644
index 0000000..d3878bb
--- /dev/null
+++ 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateRequest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.instance;
+
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.Set;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+
+/**
+ * Request to run state-check of entities
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, 
fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class AtlasCheckStateRequest implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private Set<String> entityGuids;
+    private Set<String> entityTypes;
+    private boolean     fixIssues;
+
+
+    public AtlasCheckStateRequest() {
+    }
+
+    public Set<String> getEntityGuids() {
+        return entityGuids;
+    }
+
+    public void setEntityGuids(Set<String> entityGuids) {
+        this.entityGuids = entityGuids;
+    }
+
+    public Set<String> getEntityTypes() {
+        return entityTypes;
+    }
+
+    public void setEntityTypes(Set<String> entityTypes) {
+        this.entityTypes = entityTypes;
+    }
+
+    public boolean getFixIssues() {
+        return fixIssues;
+    }
+
+    public void setFixIssues(boolean fixIssues) {
+        this.fixIssues = fixIssues;
+    }
+
+
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("AtlasCheckStateRequest{");
+        sb.append("entityGuids=[");
+        AtlasBaseTypeDef.dumpObjects(entityGuids, sb);
+        sb.append("], entityTypes=[");
+        AtlasBaseTypeDef.dumpObjects(entityTypes, sb);
+        sb.append("]");
+        sb.append(", fixIssues=").append(fixIssues);
+        sb.append("}");
+
+        return sb;
+    }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/529863a1/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateResult.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateResult.java 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateResult.java
new file mode 100644
index 0000000..35665ad
--- /dev/null
+++ 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasCheckStateResult.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.instance;
+
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+
+/**
+ * Result of Atlas state check run.
+ */
+@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, 
fieldVisibility=NONE)
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown=true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class AtlasCheckStateResult implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public enum State { OK, FIXED, PARTIALLY_FIXED, NOT_FIXED}
+
+    private int                           entitiesScanned        = 0;
+    private int                           entitiesOk             = 0;
+    private int                           entitiesFixed          = 0;
+    private int                           entitiesPartiallyFixed = 0;
+    private int                           entitiesNotFixed       = 0;
+    private State                         state                  = State.OK;
+    private Map<String, AtlasEntityState> entities               = null;
+
+
+    public AtlasCheckStateResult() {
+    }
+
+    public int getEntitiesScanned() {
+        return entitiesScanned;
+    }
+
+    public void setEntitiesScanned(int entitiesScanned) {
+        this.entitiesScanned = entitiesScanned;
+    }
+
+    public void incrEntitiesScanned() { entitiesScanned++; }
+
+    public int getEntitiesOk() {
+        return entitiesOk;
+    }
+
+    public void setEntitiesOk(int entitiesOk) {
+        this.entitiesOk = entitiesOk;
+    }
+
+    public void incrEntitiesOk() { entitiesOk++; }
+
+    public int getEntitiesFixed() {
+        return entitiesFixed;
+    }
+
+    public void setEntitiesFixed(int entitiesFixed) {
+        this.entitiesFixed = entitiesFixed;
+    }
+
+    public void incrEntitiesFixed() { entitiesFixed++; }
+
+    public int getEntitiesPartiallyFixed() {
+        return entitiesPartiallyFixed;
+    }
+
+    public void setEntitiesPartiallyFixed(int entitiesPartiallyFixed) {
+        this.entitiesPartiallyFixed = entitiesPartiallyFixed;
+    }
+
+    public void incrEntitiesPartiallyFixed() { entitiesPartiallyFixed++; }
+
+    public int getEntitiesNotFixed() {
+        return entitiesNotFixed;
+    }
+
+    public void setEntitiesNotFixed(int entitiesNotFixed) {
+        this.entitiesNotFixed = entitiesNotFixed;
+    }
+
+    public void incrEntitiesNotFixed() { entitiesNotFixed++; }
+
+    public State getState() {
+        return state;
+    }
+
+    public void setState(State state) {
+        this.state = state;
+    }
+
+    public Map<String, AtlasEntityState> getEntities() {
+        return entities;
+    }
+
+    public void setEntities(Map<String, AtlasEntityState> entities) {
+        this.entities = entities;
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("AtlasCheckStateResult{");
+        sb.append("entitiesScanned='").append(entitiesScanned);
+        sb.append(", entitiesFixed=").append(entitiesFixed);
+        sb.append(", entitiesPartiallyFixed=").append(entitiesPartiallyFixed);
+        sb.append(", entitiesNotFixed=").append(entitiesNotFixed);
+        sb.append(", state=").append(state);
+
+        sb.append("entities=[");
+        if (entities != null) {
+            boolean isFirst = true;
+            for (Map.Entry<String, AtlasEntityState> entry : 
entities.entrySet()) {
+                if (isFirst) {
+                    isFirst = false;
+                } else {
+                    sb.append(",");
+                }
+
+                sb.append(entry.getKey()).append(":");
+                entry.getValue().toString(sb);
+            }
+        }
+        sb.append("]");
+
+        sb.append("}");
+
+        return sb;
+    }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
+    }
+
+    @JsonAutoDetect(getterVisibility=PUBLIC_ONLY, 
setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE)
+    @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+    @JsonIgnoreProperties(ignoreUnknown=true)
+    @XmlRootElement
+    @XmlAccessorType(XmlAccessType.PROPERTY)
+    public static class AtlasEntityState implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private String             guid;
+        private String             typeName;
+        private String             name;
+        private AtlasEntity.Status status;
+        private State              state = State.OK;
+        private List<String>       issues;
+
+
+        public AtlasEntityState() {
+        }
+
+        public String getGuid() {
+            return guid;
+        }
+
+        public void setGuid(String guid) {
+            this.guid = guid;
+        }
+
+        public String getTypeName() {
+            return typeName;
+        }
+
+        public void setTypeName(String typeName) {
+            this.typeName = typeName;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public AtlasEntity.Status getStatus() {
+            return status;
+        }
+
+        public void setStatus(AtlasEntity.Status status) {
+            this.status = status;
+        }
+
+
+        public State getState() {
+            return state;
+        }
+
+        public void setState(State state) {
+            this.state = state;
+        }
+
+        public List<String> getIssues() {
+            return issues;
+        }
+
+        public void setIssues(List<String> issues) {
+            this.issues = issues;
+        }
+
+        public StringBuilder toString(StringBuilder sb) {
+            if (sb == null) {
+                sb = new StringBuilder();
+            }
+
+            sb.append("AtlasEntityState{");
+            sb.append("guid=").append(guid);
+            sb.append(", typeName=").append(typeName);
+            sb.append(", name=").append(name);
+            sb.append(", status=").append(status);
+            sb.append(", state=").append(state);
+            sb.append(", issues=[");
+            AtlasBaseTypeDef.dumpObjects(issues, sb);
+            sb.append("]");
+            sb.append("}");
+
+            return sb;
+        }
+
+        @Override
+        public String toString() {
+            return toString(new StringBuilder()).toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/529863a1/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
index e6f35fa..750fa17 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/AtlasEntityStore.java
@@ -18,6 +18,8 @@
 package org.apache.atlas.repository.store.graph;
 
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasCheckStateRequest;
+import org.apache.atlas.model.instance.AtlasCheckStateResult;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
@@ -107,6 +109,14 @@ public interface AtlasEntityStore {
             throws AtlasBaseException;
 
     /**
+     * Check state of entities in the store
+     * @param request AtlasCheckStateRequest
+     * @return AtlasCheckStateResult
+     * @throws AtlasBaseException
+     */
+    AtlasCheckStateResult checkState(AtlasCheckStateRequest request) throws 
AtlasBaseException;
+
+    /**
      * Create or update  entities in the stream
      * @param entityStream AtlasEntityStream
      * @return EntityMutationResponse Entity mutations operations with the 
corresponding set of entities on which these operations were performed

http://git-wip-us.apache.org/repos/asf/atlas/blob/529863a1/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index bddbf71..7333696 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -219,6 +219,30 @@ public class AtlasEntityStoreV2 implements 
AtlasEntityStore {
         return ret;
     }
 
+    /**
+     * Check state of entities in the store
+     * @param request AtlasCheckStateRequest
+     * @return AtlasCheckStateResult
+     * @throws AtlasBaseException
+     */
+    @Override
+    @GraphTransaction
+    public AtlasCheckStateResult checkState(AtlasCheckStateRequest request) 
throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> checkState({})", request);
+        }
+
+        EntityStateChecker entityStateChecker = new 
EntityStateChecker(typeRegistry);
+
+        AtlasCheckStateResult ret = entityStateChecker.checkState(request);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== checkState({}, {})", request, ret);
+        }
+
+        return ret;
+    }
+
     @Override
     @GraphTransaction
     public EntityMutationResponse createOrUpdate(EntityStream entityStream, 
boolean isPartialUpdate) throws AtlasBaseException {

http://git-wip-us.apache.org/repos/asf/atlas/blob/529863a1/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index f03262f..cdf69e3 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -325,6 +325,28 @@ public final class EntityGraphRetriever {
         return ret;
     }
 
+    public Map<String, Object> getEntityUniqueAttribute(AtlasVertex 
entityVertex) throws AtlasBaseException {
+        Map<String, Object> ret        = null;
+        String              typeName   = 
AtlasGraphUtilsV2.getTypeName(entityVertex);
+        AtlasEntityType     entityType = 
typeRegistry.getEntityTypeByName(typeName);
+
+        if (entityType != null && 
MapUtils.isNotEmpty(entityType.getUniqAttributes())) {
+            for (AtlasAttribute attribute : 
entityType.getUniqAttributes().values()) {
+                Object val = mapVertexToAttribute(entityVertex, attribute, 
null, false);
+
+                if (val != null) {
+                    if (ret == null) {
+                        ret = new HashMap<>();
+                    }
+
+                    ret.put(attribute.getName(), val);
+                }
+            }
+        }
+
+        return ret;
+    }
+
     private AtlasVertex getEntityVertex(AtlasObjectId objId) throws 
AtlasBaseException {
         AtlasVertex ret = null;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/529863a1/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityStateChecker.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityStateChecker.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityStateChecker.java
new file mode 100644
index 0000000..eb594f2
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityStateChecker.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.store.graph.v2;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.RequestContext;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.instance.AtlasCheckStateRequest;
+import org.apache.atlas.model.instance.AtlasCheckStateResult;
+import org.apache.atlas.model.instance.AtlasCheckStateResult.AtlasEntityState;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graph.AtlasGraphProvider;
+import org.apache.atlas.repository.graph.GraphHelper;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY;
+
+@Component
+public final class EntityStateChecker {
+    private static final Logger LOG = 
LoggerFactory.getLogger(EntityStateChecker.class);
+
+    private final AtlasTypeRegistry    typeRegistry;
+    private final EntityGraphRetriever entityRetriever;
+
+    @Inject
+    public EntityStateChecker(AtlasTypeRegistry typeRegistry) {
+        this.typeRegistry    = typeRegistry;
+        this.entityRetriever = new EntityGraphRetriever(typeRegistry);
+    }
+
+
+    public AtlasCheckStateResult checkState(AtlasCheckStateRequest request) 
throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> checkState({})", request);
+        }
+
+        AtlasCheckStateResult ret = new AtlasCheckStateResult();
+
+        if (request != null) {
+            if (CollectionUtils.isNotEmpty(request.getEntityGuids())) {
+                for (String guid : request.getEntityGuids()) {
+                    checkEntityState(guid, request.getFixIssues(), ret);
+                }
+            } else if (CollectionUtils.isNotEmpty(request.getEntityTypes())) {
+                final Collection<String> entityTypes;
+
+                if (request.getEntityTypes().contains("*")) {
+                    entityTypes = typeRegistry.getAllEntityDefNames();
+                } else {
+                    entityTypes = request.getEntityTypes();
+                }
+
+                LOG.info("checkState(): scanning for entities of {} types", 
entityTypes.size());
+
+                for (String typeName : entityTypes) {
+                    AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(typeName);
+
+                    if (entityType == null) {
+                        LOG.warn("checkState(): {} - entity-type not found", 
typeName);
+
+                        continue;
+                    }
+
+                    LOG.info("checkState(): scanning for {} entities", 
typeName);
+
+                    AtlasGraphQuery query = 
AtlasGraphProvider.getGraphInstance().query().has(Constants.ENTITY_TYPE_PROPERTY_KEY,
 typeName);
+
+                    int count = 0;
+                    for (Iterator<AtlasVertex> iter = 
query.vertices().iterator(); iter.hasNext(); count++) {
+                        checkEntityState(iter.next(), request.getFixIssues(), 
ret);
+                    }
+
+                    LOG.info("checkState(): scanned {} {} entities", count, 
typeName);
+                }
+            }
+
+            int incorrectFixed          = ret.getEntitiesFixed();
+            int incorrectPartiallyFixed = ret.getEntitiesPartiallyFixed();
+            int incorrectNotFixed       = ret.getEntitiesNotFixed();
+
+            if (incorrectFixed == 0 && incorrectPartiallyFixed == 0 && 
incorrectNotFixed == 0) {
+                ret.setState(AtlasCheckStateResult.State.OK);
+            } else if (incorrectPartiallyFixed != 0) {
+                ret.setState(AtlasCheckStateResult.State.PARTIALLY_FIXED);
+            } else if (incorrectNotFixed != 0) {
+                ret.setState(incorrectFixed > 0 ? 
AtlasCheckStateResult.State.PARTIALLY_FIXED : 
AtlasCheckStateResult.State.NOT_FIXED);
+            } else {
+                ret.setState(AtlasCheckStateResult.State.FIXED);
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== checkState({}, {})", request, ret);
+        }
+
+        return ret;
+    }
+
+
+    /**
+     * Check an entity state given its GUID
+     * @param guid
+     * @return
+     * @throws AtlasBaseException
+     */
+    public AtlasEntityState checkEntityState(String guid, boolean fixIssues, 
AtlasCheckStateResult result) throws AtlasBaseException {
+        AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(guid);
+
+        if (entityVertex == null) {
+            throw new 
AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, guid);
+        }
+
+        return checkEntityState(entityVertex, fixIssues, result);
+    }
+
+    /**
+     * Check an entity state given its vertex
+     * @param entityVertex
+     * @return
+     * @throws AtlasBaseException
+     */
+    public AtlasEntityState checkEntityState(AtlasVertex entityVertex, boolean 
fixIssues, AtlasCheckStateResult result) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> checkEntityState(guid={})", 
AtlasGraphUtilsV2.getIdFromVertex(entityVertex));
+        }
+
+        AtlasEntityState ret = new AtlasEntityState();
+
+        ret.setGuid(AtlasGraphUtilsV2.getIdFromVertex(entityVertex));
+        ret.setTypeName(AtlasGraphUtilsV2.getTypeName(entityVertex));
+        ret.setName(getEntityName(entityVertex));
+        ret.setStatus(AtlasGraphUtilsV2.getState(entityVertex));
+        ret.setState(AtlasCheckStateResult.State.OK);
+
+        checkEntityState_Classifications(entityVertex, ret, fixIssues);
+
+        if (ret.getState() != AtlasCheckStateResult.State.OK) { // don't 
include clean entities in the response
+            if (result.getEntities() == null) {
+                result.setEntities(new HashMap<String, AtlasEntityState>());
+            }
+
+            result.getEntities().put(ret.getGuid(), ret);
+        }
+
+        result.incrEntitiesScanned();
+
+        switch (ret.getState()) {
+            case FIXED:
+                result.incrEntitiesFixed();
+                break;
+
+            case PARTIALLY_FIXED:
+                result.incrEntitiesPartiallyFixed();
+                break;
+
+            case NOT_FIXED:
+                result.incrEntitiesNotFixed();
+                break;
+
+            case OK:
+                result.incrEntitiesOk();
+                break;
+        }
+
+        LOG.info("checkEntityState(guid={}; type={}; name={}): {}", 
ret.getGuid(), ret.getTypeName(), ret.getName(), ret.getState());
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== checkEntityState({}): {}", ret.getGuid(), ret);
+        }
+
+        return ret;
+    }
+
+    private void checkEntityState_Classifications(AtlasVertex entityVertex, 
AtlasEntityState result, boolean fixIssues) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> checkEntityState_Classifications({}, {})", result, 
fixIssues);
+        }
+
+        Collection<String>  traitNames                 = 
entityVertex.getPropertyValues(Constants.TRAIT_NAMES_PROPERTY_KEY, 
String.class);
+        Collection<String>  propagatedTraitNames       = 
entityVertex.getPropertyValues(Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, 
String.class);
+        Collection<String>  traitVertexNames           = null;
+        Collection<String>  propagatedTraitVertexNames = null;
+        Iterable<AtlasEdge> edges                      = 
entityVertex.getEdges(AtlasEdgeDirection.OUT, Constants.CLASSIFICATION_LABEL);
+
+        if (edges != null) {
+            for (Iterator<AtlasEdge> iter = edges.iterator(); iter.hasNext(); 
) {
+                AtlasEdge               edge               = iter.next();
+                Boolean                 isPropagated       = 
AtlasGraphUtilsV2.getEncodedProperty(edge, 
CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class);
+                String                  classificationName = 
GraphHelper.getTypeName(edge.getInVertex());
+                AtlasClassificationType classification     = 
typeRegistry.getClassificationTypeByName(classificationName);
+
+                if (classification != null) {
+                    if (isPropagated != null && isPropagated) {
+                        propagatedTraitVertexNames = 
addToCollection(propagatedTraitVertexNames, classificationName);
+                    } else {
+                        traitVertexNames = addToCollection(traitVertexNames, 
classificationName);
+                    }
+                }
+            }
+
+            if (traitVertexNames == null) {
+                traitVertexNames = Collections.emptyList();
+            }
+
+            if (propagatedTraitVertexNames == null) {
+                propagatedTraitVertexNames = Collections.emptyList();
+            }
+        }
+
+        Collection<String> traitNamesToAdd              = 
subtract(traitVertexNames, traitNames);
+        Collection<String> traitNamesToRemove           = subtract(traitNames, 
traitVertexNames);
+        Collection<String> propagatedTraitNamesToAdd    = 
subtract(propagatedTraitVertexNames, propagatedTraitNames);
+        Collection<String> propagatedTraitNamesToRemove = 
subtract(propagatedTraitNames, propagatedTraitVertexNames);
+
+        if (traitNamesToAdd != null || traitNamesToRemove != null || 
propagatedTraitNamesToAdd != null || propagatedTraitNamesToRemove != null) {
+            List<String> issues = result.getIssues();
+
+            if (issues == null) {
+                issues = new ArrayList<>();
+
+                result.setIssues(issues);
+            }
+
+            if (fixIssues) {
+                if (traitNamesToAdd != null || traitNamesToRemove != null) {
+                    if (traitNamesToAdd != null) {
+                        issues.add("incorrect property: __traitNames has 
missing classifications: " + traitNamesToAdd.toString());
+                    }
+
+                    if (traitNamesToRemove != null) {
+                        issues.add("incorrect property: __traitNames has 
unassigned classifications: " + traitNamesToRemove.toString());
+                    }
+
+                    
entityVertex.removeProperty(Constants.TRAIT_NAMES_PROPERTY_KEY);
+
+                    for (String classificationName : traitVertexNames) {
+                        AtlasGraphUtilsV2.addEncodedProperty(entityVertex, 
Constants.TRAIT_NAMES_PROPERTY_KEY, classificationName);
+                    }
+                }
+
+                if (propagatedTraitNamesToAdd != null || 
propagatedTraitNamesToRemove != null) {
+                    if (propagatedTraitNamesToAdd != null) {
+                        issues.add("incorrect property: __propagatedTraitNames 
has missing classifications: " + propagatedTraitNamesToAdd.toString());
+                    }
+
+                    if (propagatedTraitNamesToRemove != null) {
+                        issues.add("incorrect property: __propagatedTraitNames 
has unassigned classifications: " + propagatedTraitNamesToRemove.toString());
+                    }
+
+                    
entityVertex.removeProperty(Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY);
+
+                    for (String classificationName : 
propagatedTraitVertexNames) {
+                        AtlasGraphUtilsV2.addEncodedProperty(entityVertex, 
Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, classificationName);
+                    }
+                }
+
+                AtlasGraphUtilsV2.setEncodedProperty(entityVertex, 
Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, 
RequestContext.get().getRequestTime());
+
+                result.setState(AtlasCheckStateResult.State.FIXED);
+            } else {
+                result.setState(AtlasCheckStateResult.State.NOT_FIXED);
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== checkEntityState_Classifications({}, {})", result, 
fixIssues);
+        }
+    }
+
+    private String getEntityName(AtlasVertex entityVertex) throws 
AtlasBaseException {
+        String              ret              = null;
+        Map<String, Object> uniqueAttributes = 
entityRetriever.getEntityUniqueAttribute(entityVertex);
+
+        if (uniqueAttributes != null) {
+            Object val = uniqueAttributes.get("qualifiedName");
+
+            if (val == null) {
+                for (Object attrVal : uniqueAttributes.values()) {
+                    if (attrVal != null) {
+                        ret = attrVal.toString();
+
+                        break;
+                    }
+                }
+            } else {
+                ret = val.toString();
+            }
+        }
+
+        return ret;
+    }
+
+    private Collection<String> addToCollection(Collection<String> list, String 
str) {
+        if (list == null) {
+            list = new ArrayList<>();
+        }
+
+        list.add(str);
+
+        return list;
+    }
+
+    // return elements in 'col1' that are not in 'col2'
+    private Collection<String> subtract(Collection<String> col1, 
Collection<String> col2) {
+        Collection<String> ret = null;
+
+        if (col2 == null) {
+            ret = col1;
+        } else if (col1 != null) {
+            for (String elem : col1) {
+                if (!col2.contains(elem)) {
+                    if (ret == null) {
+                        ret = new ArrayList<>();
+                    }
+
+                    ret.add(elem);
+                }
+            }
+        }
+
+        return ret;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/atlas/blob/529863a1/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java 
b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index d9b1a41..3998932 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -36,6 +36,8 @@ import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.impexp.ExportImportAuditEntry;
 import org.apache.atlas.model.impexp.MigrationStatus;
+import org.apache.atlas.model.instance.AtlasCheckStateRequest;
+import org.apache.atlas.model.instance.AtlasCheckStateResult;
 import org.apache.atlas.model.metrics.AtlasMetrics;
 import org.apache.atlas.repository.impexp.AtlasServerService;
 import org.apache.atlas.repository.impexp.ExportImportAuditService;
@@ -44,6 +46,7 @@ import org.apache.atlas.repository.impexp.ImportService;
 import org.apache.atlas.repository.impexp.MigrationProgressService;
 import org.apache.atlas.repository.impexp.ZipSink;
 import org.apache.atlas.repository.impexp.ZipSource;
+import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.services.MetricsService;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
@@ -129,10 +132,11 @@ public class AdminResource {
     private final  ImportService            importService;
     private final  SearchTracker            activeSearches;
     private final  AtlasTypeRegistry        typeRegistry;
-    private final MigrationProgressService migrationProgressService;
+    private final  MigrationProgressService migrationProgressService;
     private final  ReentrantLock            importExportOperationLock;
     private final  ExportImportAuditService exportImportAuditService;
     private final  AtlasServerService       atlasServerService;
+    private final  AtlasEntityStore         entityStore;
 
     static {
         try {
@@ -147,16 +151,17 @@ public class AdminResource {
                          ExportService exportService, ImportService 
importService, SearchTracker activeSearches,
                          MigrationProgressService migrationProgressService,
                          AtlasServerService serverService,
-                         ExportImportAuditService exportImportAuditService) {
-        this.serviceState               = serviceState;
-        this.metricsService             = metricsService;
-        this.exportService = exportService;
-        this.importService = importService;
-        this.activeSearches = activeSearches;
-        this.typeRegistry = typeRegistry;
-        this.migrationProgressService = migrationProgressService;
-        this.atlasServerService = serverService;
-        this.exportImportAuditService = exportImportAuditService;
+                         ExportImportAuditService exportImportAuditService, 
AtlasEntityStore entityStore) {
+        this.serviceState              = serviceState;
+        this.metricsService            = metricsService;
+        this.exportService             = exportService;
+        this.importService             = importService;
+        this.activeSearches            = activeSearches;
+        this.typeRegistry              = typeRegistry;
+        this.migrationProgressService  = migrationProgressService;
+        this.atlasServerService        = serverService;
+        this.entityStore               = entityStore;
+        this.exportImportAuditService  = exportImportAuditService;
         this.importExportOperationLock = new ReentrantLock();
     }
 
@@ -530,6 +535,26 @@ public class AdminResource {
         return null != terminate;
     }
 
+    @POST
+    @Path("checkstate")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(Servlets.JSON_MEDIA_TYPE)
+    public AtlasCheckStateResult checkState(AtlasCheckStateRequest request) 
throws AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "checkState(" + 
request + ")");
+            }
+
+            AtlasCheckStateResult ret = entityStore.checkState(request);
+
+            return ret;
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
     private String getEditableEntityTypes(Configuration config) {
         String ret = DEFAULT_EDITABLE_ENTITY_TYPES;
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/529863a1/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java 
b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
index 2dbc702..223a90a 100644
--- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java
@@ -51,7 +51,7 @@ public class AdminResourceTest {
 
         
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null);
         Response response = adminResource.getStatus();
         assertEquals(response.getStatus(), HttpServletResponse.SC_OK);
         JsonNode entity = AtlasJson.parseToV1JsonNode((String) 
response.getEntity());
@@ -62,7 +62,7 @@ public class AdminResourceTest {
     public void testResourceGetsValueFromServiceState() throws IOException {
         
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE);
 
-        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null);
+        AdminResource adminResource = new AdminResource(serviceState, null, 
null, null, null, null, null, null, null, null);
         Response response = adminResource.getStatus();
 
         verify(serviceState).getState();
@@ -70,4 +70,4 @@ public class AdminResourceTest {
         assertEquals(entity.get("Status").asText(), "PASSIVE");
 
     }
-}
\ No newline at end of file
+}

Reply via email to