This is an automated email from the ASF dual-hosted git repository.

sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new d2436b0  ATLAS-3566: improvements in upgrade patches, to avoid 
full-scan
d2436b0 is described below

commit d2436b02a618d6a8ebad54df4affaf45a61a6206
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Thu Dec 19 15:59:43 2019 -0800

    ATLAS-3566: improvements in upgrade patches, to avoid full-scan
    
    Signed-off-by: Sarath Subramanian <[email protected]>
    (cherry picked from commit e2ba3051d6c18d84b79d803490227d612e0e7858)
---
 .../patches/ClassificationTextPatch.java           | 59 ++++++++++++++--
 .../patches/ConcurrentPatchProcessor.java          | 79 ++++++++++------------
 .../repository/patches/UniqueAttributePatch.java   | 67 +++++++++---------
 3 files changed, 120 insertions(+), 85 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
index 2af50ba..8351942 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
@@ -18,11 +18,22 @@
 package org.apache.atlas.repository.patches;
 
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasClassificationType;
 import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
 
 public class ClassificationTextPatch extends AtlasPatchHandler {
@@ -57,16 +68,54 @@ public class ClassificationTextPatch extends 
AtlasPatchHandler {
         }
 
         @Override
-        protected void processVertexItem(Long vertexId, AtlasVertex vertex, 
String typeName, AtlasEntityType entityType) throws AtlasBaseException {
-            processItem(vertexId, vertex, typeName, entityType);
+        protected void prepareForExecution() {
+            //do nothing
         }
 
         @Override
-        protected void prepareForExecution() {
-            //do nothing
+        public void submitVerticesToUpdate(WorkItemManager manager) {
+            AtlasTypeRegistry typeRegistry = getTypeRegistry();
+            AtlasGraph        graph        = getGraph();
+            Set<Long>         vertexIds    = new HashSet<>();
+
+            for (AtlasClassificationType classificationType : 
typeRegistry.getAllClassificationTypes()) {
+                LOG.info("finding classification of type {}", 
classificationType.getTypeName());
+
+                Iterable<AtlasVertex> iterable = 
graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, 
classificationType.getTypeName()).vertices();
+                int                   count    = 0;
+
+                for (Iterator<AtlasVertex> iter = iterable.iterator(); 
iter.hasNext(); ) {
+                    AtlasVertex         classificationVertex = iter.next();
+                    Iterable<AtlasEdge> edges                = 
classificationVertex.getEdges(AtlasEdgeDirection.IN);
+
+                    for (AtlasEdge edge : edges) {
+                        AtlasVertex entityVertex = edge.getOutVertex();
+                        Long        vertexId     = (Long) entityVertex.getId();
+
+                        if (vertexIds.contains(vertexId)) {
+                            continue;
+                        }
+
+                        vertexIds.add(vertexId);
+
+                        manager.checkProduce(vertexId);
+                    }
+
+                    count++;
+                }
+
+                LOG.info("found {} classification of type {}", count, 
classificationType.getTypeName());
+            }
+
+            LOG.info("found {} entities with classifications", 
vertexIds.size());
+        }
+
+        @Override
+        protected void processVertexItem(Long vertexId, AtlasVertex vertex, 
String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+            processItem(vertexId, vertex, typeName, entityType);
         }
 
-        protected void processItem(Long vertexId, AtlasVertex vertex, String 
typeName, AtlasEntityType entityType) throws AtlasBaseException {
+        private void processItem(Long vertexId, AtlasVertex vertex, String 
typeName, AtlasEntityType entityType) throws AtlasBaseException {
             if(LOG.isDebugEnabled()) {
                 LOG.debug("processItem(typeName={}, vertexId={})", typeName, 
vertexId);
             }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
index 3eedb98..5a9ac2a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
@@ -18,7 +18,6 @@
 package org.apache.atlas.repository.patches;
 
 import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.pc.WorkItemBuilder;
@@ -30,47 +29,37 @@ import 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 public abstract class ConcurrentPatchProcessor {
     private static final Logger LOG = 
LoggerFactory.getLogger(ConcurrentPatchProcessor.class);
 
-    private static final String     NUM_WORKERS_PROPERTY = 
"atlas.patch.numWorkers";
-    private static final String     BATCH_SIZE_PROPERTY  = 
"atlas.patch.batchSize";
-    private static final String     ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
-    private static final String     WORKER_NAME_PREFIX   = "patchWorkItem";
-    private static final int        NUM_WORKERS;
-    private static final int        BATCH_SIZE;
-    private final EntityGraphMapper entityGraphMapper;
+    private static final String NUM_WORKERS_PROPERTY = 
"atlas.patch.numWorkers";
+    private static final String BATCH_SIZE_PROPERTY  = "atlas.patch.batchSize";
+    private static final String ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
+    private static final String WORKER_NAME_PREFIX   = "patchWorkItem";
+    private static final int    NUM_WORKERS;
+    private static final int    BATCH_SIZE;
 
-    public AtlasGraph getGraph() {
-        return graph;
-    }
-
-    public GraphBackedSearchIndexer getIndexer() {
-        return indexer;
-    }
-
-    public AtlasTypeRegistry getTypeRegistry() {
-        return typeRegistry;
-    }
-
-    private final AtlasGraph graph;
+    private final EntityGraphMapper        entityGraphMapper;
+    private final AtlasGraph               graph;
     private final GraphBackedSearchIndexer indexer;
-    private final AtlasTypeRegistry typeRegistry;
+    private final AtlasTypeRegistry        typeRegistry;
 
     static {
         int numWorkers = 3;
         int batchSize  = 300;
 
         try {
-            numWorkers = 
ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, 
getDefaultNumWorkers());
-            batchSize  = 
ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
+            Configuration config = ApplicationProperties.get();
+
+            numWorkers = config.getInt(NUM_WORKERS_PROPERTY, 
config.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
+            batchSize  = config.getInt(BATCH_SIZE_PROPERTY, 300);
 
             LOG.info("UniqueAttributePatch: {}={}, {}={}", 
NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
         } catch (Exception e) {
@@ -81,10 +70,6 @@ public abstract class ConcurrentPatchProcessor {
         BATCH_SIZE  = batchSize;
     }
 
-    private static int getDefaultNumWorkers() throws AtlasException {
-        return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
-    }
-
     public ConcurrentPatchProcessor(PatchContext context) {
         this.graph             = context.getGraph();
         this.indexer           = context.getIndexer();
@@ -95,21 +80,34 @@ public abstract class ConcurrentPatchProcessor {
     public EntityGraphMapper getEntityGraphMapper() {
         return entityGraphMapper;
     }
+
+    public AtlasGraph getGraph() {
+        return graph;
+    }
+
+    public GraphBackedSearchIndexer getIndexer() {
+        return indexer;
+    }
+
+    public AtlasTypeRegistry getTypeRegistry() {
+        return typeRegistry;
+    }
+
     public void apply() throws AtlasBaseException {
         prepareForExecution();
         execute();
     }
 
+    protected abstract void prepareForExecution() throws AtlasBaseException;
+    protected abstract void submitVerticesToUpdate(WorkItemManager manager);
+    protected abstract void processVertexItem(Long vertexId, AtlasVertex 
vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
+
     private void execute() {
-        Iterable<Object> iterable = graph.query().vertexIds();
-        WorkItemManager manager = new WorkItemManager(
-                new ConsumerBuilder(graph, typeRegistry, this), 
WORKER_NAME_PREFIX,
-                BATCH_SIZE, NUM_WORKERS, false);
+        WorkItemManager manager = new WorkItemManager(new 
ConsumerBuilder(graph, typeRegistry, this),
+                                                      WORKER_NAME_PREFIX, 
BATCH_SIZE, NUM_WORKERS, false);
+
         try {
-            for (Iterator<Object> iter = iterable.iterator(); iter.hasNext(); 
) {
-                Object vertexId = iter.next();
-                submitForProcessing((Long) vertexId, manager);
-            }
+            submitVerticesToUpdate(manager);
 
             manager.drain();
         } finally {
@@ -121,10 +119,6 @@ public abstract class ConcurrentPatchProcessor {
         }
     }
 
-    private void submitForProcessing(Long vertexId, WorkItemManager manager) {
-        manager.checkProduce(vertexId);
-    }
-
     private static class ConsumerBuilder implements WorkItemBuilder<Consumer, 
Long> {
         private final AtlasTypeRegistry typeRegistry;
         private final AtlasGraph graph;
@@ -228,7 +222,4 @@ public abstract class ConcurrentPatchProcessor {
             }
         }
     }
-
-    protected abstract void processVertexItem(Long vertexId, AtlasVertex 
vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
-    protected abstract void prepareForExecution() throws AtlasBaseException;
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index bd5e32b..2b58119 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -17,25 +17,24 @@
  */
 package org.apache.atlas.repository.patches;
 
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.IndexException;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
-import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.graphdb.*;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
 import org.apache.atlas.type.AtlasEntityType;
 import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.Iterator;
 
 import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
 import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
@@ -66,31 +65,37 @@ public class UniqueAttributePatch extends AtlasPatchHandler 
{
     }
 
     public static class UniqueAttributePatchProcessor extends 
ConcurrentPatchProcessor {
-        private static final String NUM_WORKERS_PROPERTY = 
"atlas.patch.unique_attribute_patch.numWorkers";
-        private static final String BATCH_SIZE_PROPERTY  = 
"atlas.patch.unique_attribute_patch.batchSize";
-        private static final String ATLAS_SOLR_SHARDS    = "ATLAS_SOLR_SHARDS";
-        private static final int    NUM_WORKERS;
-        private static final int    BATCH_SIZE;
+        public UniqueAttributePatchProcessor(PatchContext context) {
+            super(context);
+        }
 
-        static {
-            int numWorkers = 3;
-            int batchSize  = 300;
+        @Override
+        protected void prepareForExecution() {
+            //create the new attribute for all unique attributes.
+            createIndexForUniqueAttributes();
+        }
 
-            try {
-                numWorkers = 
ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY, 
getDefaultNumWorkers());
-                batchSize  = 
ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
+        @Override
+        public void submitVerticesToUpdate(WorkItemManager manager) {
+            AtlasTypeRegistry typeRegistry = getTypeRegistry();
+            AtlasGraph        graph        = getGraph();
 
-                LOG.info("UniqueAttributePatch: {}={}, {}={}", 
NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
-            } catch (Exception e) {
-                LOG.error("Error retrieving configuration.", e);
-            }
+            for (AtlasEntityType entityType : 
typeRegistry.getAllEntityTypes()) {
+                LOG.info("finding entities of type {}", 
entityType.getTypeName());
 
-            NUM_WORKERS = numWorkers;
-            BATCH_SIZE  = batchSize;
-        }
+                Iterable<Object> iterable = 
graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, 
entityType.getTypeName()).vertexIds();
+                int              count    = 0;
 
-        public UniqueAttributePatchProcessor(PatchContext context) {
-            super(context);
+                for (Iterator<Object> iter = iterable.iterator(); 
iter.hasNext(); ) {
+                    Object vertexId = iter.next();
+
+                    manager.checkProduce((Long) vertexId);
+
+                    count++;
+                }
+
+                LOG.info("found {} entities of type {}", count, 
entityType.getTypeName());
+            }
         }
 
         @Override
@@ -99,12 +104,6 @@ public class UniqueAttributePatch extends AtlasPatchHandler 
{
             processItem(vertexId, vertex, typeName, entityType);
         }
 
-        @Override
-        protected void prepareForExecution() {
-            //create the new attribute for all unique attributes.
-            createIndexForUniqueAttributes();
-        }
-
         private void createIndexForUniqueAttributes() {
             for (AtlasEntityType entityType : 
getTypeRegistry().getAllEntityTypes()) {
 
@@ -157,10 +156,6 @@ public class UniqueAttributePatch extends 
AtlasPatchHandler {
             }
         }
 
-        private static int getDefaultNumWorkers() throws AtlasException {
-            return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 
3;
-        }
-
         protected void processItem(Long vertexId, AtlasVertex vertex, String 
typeName, AtlasEntityType entityType) {
             LOG.debug("processItem(typeName={}, vertexId={})", typeName, 
vertexId);
 

Reply via email to