This is an automated email from the ASF dual-hosted git repository.
sarath pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new d2436b0 ATLAS-3566: improvements in upgrade patches, to avoid
full-scan
d2436b0 is described below
commit d2436b02a618d6a8ebad54df4affaf45a61a6206
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Thu Dec 19 15:59:43 2019 -0800
ATLAS-3566: improvements in upgrade patches, to avoid full-scan
Signed-off-by: Sarath Subramanian <[email protected]>
(cherry picked from commit e2ba3051d6c18d84b79d803490227d612e0e7858)
---
.../patches/ClassificationTextPatch.java | 59 ++++++++++++++--
.../patches/ConcurrentPatchProcessor.java | 79 ++++++++++------------
.../repository/patches/UniqueAttributePatch.java | 67 +++++++++---------
3 files changed, 120 insertions(+), 85 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
index 2af50ba..8351942 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/ClassificationTextPatch.java
@@ -18,11 +18,22 @@
package org.apache.atlas.repository.patches;
import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasEdge;
+import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasClassificationType;
import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
public class ClassificationTextPatch extends AtlasPatchHandler {
@@ -57,16 +68,54 @@ public class ClassificationTextPatch extends
AtlasPatchHandler {
}
@Override
- protected void processVertexItem(Long vertexId, AtlasVertex vertex,
String typeName, AtlasEntityType entityType) throws AtlasBaseException {
- processItem(vertexId, vertex, typeName, entityType);
+ protected void prepareForExecution() {
+ //do nothing
}
@Override
- protected void prepareForExecution() {
- //do nothing
+ public void submitVerticesToUpdate(WorkItemManager manager) {
+ AtlasTypeRegistry typeRegistry = getTypeRegistry();
+ AtlasGraph graph = getGraph();
+ Set<Long> vertexIds = new HashSet<>();
+
+ for (AtlasClassificationType classificationType :
typeRegistry.getAllClassificationTypes()) {
+ LOG.info("finding classification of type {}",
classificationType.getTypeName());
+
+ Iterable<AtlasVertex> iterable =
graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY,
classificationType.getTypeName()).vertices();
+ int count = 0;
+
+ for (Iterator<AtlasVertex> iter = iterable.iterator();
iter.hasNext(); ) {
+ AtlasVertex classificationVertex = iter.next();
+ Iterable<AtlasEdge> edges =
classificationVertex.getEdges(AtlasEdgeDirection.IN);
+
+ for (AtlasEdge edge : edges) {
+ AtlasVertex entityVertex = edge.getOutVertex();
+ Long vertexId = (Long) entityVertex.getId();
+
+ if (vertexIds.contains(vertexId)) {
+ continue;
+ }
+
+ vertexIds.add(vertexId);
+
+ manager.checkProduce(vertexId);
+ }
+
+ count++;
+ }
+
+ LOG.info("found {} classification of type {}", count,
classificationType.getTypeName());
+ }
+
+ LOG.info("found {} entities with classifications",
vertexIds.size());
+ }
+
+ @Override
+ protected void processVertexItem(Long vertexId, AtlasVertex vertex,
String typeName, AtlasEntityType entityType) throws AtlasBaseException {
+ processItem(vertexId, vertex, typeName, entityType);
}
- protected void processItem(Long vertexId, AtlasVertex vertex, String
typeName, AtlasEntityType entityType) throws AtlasBaseException {
+ private void processItem(Long vertexId, AtlasVertex vertex, String
typeName, AtlasEntityType entityType) throws AtlasBaseException {
if(LOG.isDebugEnabled()) {
LOG.debug("processItem(typeName={}, vertexId={})", typeName,
vertexId);
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
index 3eedb98..5a9ac2a 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
@@ -18,7 +18,6 @@
package org.apache.atlas.repository.patches;
import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.WorkItemBuilder;
@@ -30,47 +29,37 @@ import
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphMapper;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
public abstract class ConcurrentPatchProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(ConcurrentPatchProcessor.class);
- private static final String NUM_WORKERS_PROPERTY =
"atlas.patch.numWorkers";
- private static final String BATCH_SIZE_PROPERTY =
"atlas.patch.batchSize";
- private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
- private static final String WORKER_NAME_PREFIX = "patchWorkItem";
- private static final int NUM_WORKERS;
- private static final int BATCH_SIZE;
- private final EntityGraphMapper entityGraphMapper;
+ private static final String NUM_WORKERS_PROPERTY =
"atlas.patch.numWorkers";
+ private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize";
+ private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
+ private static final String WORKER_NAME_PREFIX = "patchWorkItem";
+ private static final int NUM_WORKERS;
+ private static final int BATCH_SIZE;
- public AtlasGraph getGraph() {
- return graph;
- }
-
- public GraphBackedSearchIndexer getIndexer() {
- return indexer;
- }
-
- public AtlasTypeRegistry getTypeRegistry() {
- return typeRegistry;
- }
-
- private final AtlasGraph graph;
+ private final EntityGraphMapper entityGraphMapper;
+ private final AtlasGraph graph;
private final GraphBackedSearchIndexer indexer;
- private final AtlasTypeRegistry typeRegistry;
+ private final AtlasTypeRegistry typeRegistry;
static {
int numWorkers = 3;
int batchSize = 300;
try {
- numWorkers =
ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY,
getDefaultNumWorkers());
- batchSize =
ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
+ Configuration config = ApplicationProperties.get();
+
+ numWorkers = config.getInt(NUM_WORKERS_PROPERTY,
config.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
+ batchSize = config.getInt(BATCH_SIZE_PROPERTY, 300);
LOG.info("UniqueAttributePatch: {}={}, {}={}",
NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
} catch (Exception e) {
@@ -81,10 +70,6 @@ public abstract class ConcurrentPatchProcessor {
BATCH_SIZE = batchSize;
}
- private static int getDefaultNumWorkers() throws AtlasException {
- return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) * 3;
- }
-
public ConcurrentPatchProcessor(PatchContext context) {
this.graph = context.getGraph();
this.indexer = context.getIndexer();
@@ -95,21 +80,34 @@ public abstract class ConcurrentPatchProcessor {
public EntityGraphMapper getEntityGraphMapper() {
return entityGraphMapper;
}
+
+ public AtlasGraph getGraph() {
+ return graph;
+ }
+
+ public GraphBackedSearchIndexer getIndexer() {
+ return indexer;
+ }
+
+ public AtlasTypeRegistry getTypeRegistry() {
+ return typeRegistry;
+ }
+
public void apply() throws AtlasBaseException {
prepareForExecution();
execute();
}
+ protected abstract void prepareForExecution() throws AtlasBaseException;
+ protected abstract void submitVerticesToUpdate(WorkItemManager manager);
+ protected abstract void processVertexItem(Long vertexId, AtlasVertex
vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
+
private void execute() {
- Iterable<Object> iterable = graph.query().vertexIds();
- WorkItemManager manager = new WorkItemManager(
- new ConsumerBuilder(graph, typeRegistry, this),
WORKER_NAME_PREFIX,
- BATCH_SIZE, NUM_WORKERS, false);
+ WorkItemManager manager = new WorkItemManager(new
ConsumerBuilder(graph, typeRegistry, this),
+ WORKER_NAME_PREFIX,
BATCH_SIZE, NUM_WORKERS, false);
+
try {
- for (Iterator<Object> iter = iterable.iterator(); iter.hasNext();
) {
- Object vertexId = iter.next();
- submitForProcessing((Long) vertexId, manager);
- }
+ submitVerticesToUpdate(manager);
manager.drain();
} finally {
@@ -121,10 +119,6 @@ public abstract class ConcurrentPatchProcessor {
}
}
- private void submitForProcessing(Long vertexId, WorkItemManager manager) {
- manager.checkProduce(vertexId);
- }
-
private static class ConsumerBuilder implements WorkItemBuilder<Consumer,
Long> {
private final AtlasTypeRegistry typeRegistry;
private final AtlasGraph graph;
@@ -228,7 +222,4 @@ public abstract class ConcurrentPatchProcessor {
}
}
}
-
- protected abstract void processVertexItem(Long vertexId, AtlasVertex
vertex, String typeName, AtlasEntityType entityType) throws AtlasBaseException;
- protected abstract void prepareForExecution() throws AtlasBaseException;
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index bd5e32b..2b58119 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -17,25 +17,24 @@
*/
package org.apache.atlas.repository.patches;
-import org.apache.atlas.ApplicationProperties;
-import org.apache.atlas.AtlasException;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind;
-import org.apache.atlas.repository.graphdb.AtlasCardinality;
-import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
-import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException;
-import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.Iterator;
import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
import static
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
@@ -66,31 +65,37 @@ public class UniqueAttributePatch extends AtlasPatchHandler
{
}
public static class UniqueAttributePatchProcessor extends
ConcurrentPatchProcessor {
- private static final String NUM_WORKERS_PROPERTY =
"atlas.patch.unique_attribute_patch.numWorkers";
- private static final String BATCH_SIZE_PROPERTY =
"atlas.patch.unique_attribute_patch.batchSize";
- private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
- private static final int NUM_WORKERS;
- private static final int BATCH_SIZE;
+ public UniqueAttributePatchProcessor(PatchContext context) {
+ super(context);
+ }
- static {
- int numWorkers = 3;
- int batchSize = 300;
+ @Override
+ protected void prepareForExecution() {
+ //create the new attribute for all unique attributes.
+ createIndexForUniqueAttributes();
+ }
- try {
- numWorkers =
ApplicationProperties.get().getInt(NUM_WORKERS_PROPERTY,
getDefaultNumWorkers());
- batchSize =
ApplicationProperties.get().getInt(BATCH_SIZE_PROPERTY, 300);
+ @Override
+ public void submitVerticesToUpdate(WorkItemManager manager) {
+ AtlasTypeRegistry typeRegistry = getTypeRegistry();
+ AtlasGraph graph = getGraph();
- LOG.info("UniqueAttributePatch: {}={}, {}={}",
NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
- } catch (Exception e) {
- LOG.error("Error retrieving configuration.", e);
- }
+ for (AtlasEntityType entityType :
typeRegistry.getAllEntityTypes()) {
+ LOG.info("finding entities of type {}",
entityType.getTypeName());
- NUM_WORKERS = numWorkers;
- BATCH_SIZE = batchSize;
- }
+ Iterable<Object> iterable =
graph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY,
entityType.getTypeName()).vertexIds();
+ int count = 0;
- public UniqueAttributePatchProcessor(PatchContext context) {
- super(context);
+ for (Iterator<Object> iter = iterable.iterator();
iter.hasNext(); ) {
+ Object vertexId = iter.next();
+
+ manager.checkProduce((Long) vertexId);
+
+ count++;
+ }
+
+ LOG.info("found {} entities of type {}", count,
entityType.getTypeName());
+ }
}
@Override
@@ -99,12 +104,6 @@ public class UniqueAttributePatch extends AtlasPatchHandler
{
processItem(vertexId, vertex, typeName, entityType);
}
- @Override
- protected void prepareForExecution() {
- //create the new attribute for all unique attributes.
- createIndexForUniqueAttributes();
- }
-
private void createIndexForUniqueAttributes() {
for (AtlasEntityType entityType :
getTypeRegistry().getAllEntityTypes()) {
@@ -157,10 +156,6 @@ public class UniqueAttributePatch extends
AtlasPatchHandler {
}
}
- private static int getDefaultNumWorkers() throws AtlasException {
- return ApplicationProperties.get().getInt(ATLAS_SOLR_SHARDS, 1) *
3;
- }
-
protected void processItem(Long vertexId, AtlasVertex vertex, String
typeName, AtlasEntityType entityType) {
LOG.debug("processItem(typeName={}, vertexId={})", typeName,
vertexId);