This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 43c9e32c89263a8e98c98dccc47f6b5ae7a3ceba Author: Ashutosh Mestry <[email protected]> AuthorDate: Wed Apr 29 16:32:29 2020 -0700 ATLAS-3762: Improve Edge creator using Genuine iterator. (cherry picked from commit 25f3002e0e84927eb39cebb5708d77ef81755d79) --- .../atlas/repository/graphdb/AtlasVertex.java | 10 +++++ .../repository/graphdb/janus/AtlasJanusGraph.java | 36 +++++++++++++++--- .../repository/graphdb/janus/AtlasJanusVertex.java | 17 +++++++++ .../apache/atlas/repository/graph/GraphHelper.java | 25 ++++++++++++- .../store/graph/v2/AtlasRelationshipStoreV2.java | 43 ++++++++++++++++------ 5 files changed, 111 insertions(+), 20 deletions(-) diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java index 9406e26..20e6177 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java @@ -45,6 +45,16 @@ public interface AtlasVertex<V, E> extends AtlasElement { */ Iterable<AtlasEdge<V, E>> getEdges(AtlasEdgeDirection direction, String[] edgeLabels); + long getEdgesCount(AtlasEdgeDirection direction, String edgeLabel); + + /** + * Does vertex have edges specified by the direction and label + * @param dir + * @param edgeLabel + * @return + */ + boolean hasEdges(AtlasEdgeDirection dir, String edgeLabel); + /** * Gets the edges associated with this vertex going the * specified direction. diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index eb02062..a30dbc7 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository.graphdb.janus; import com.google.common.base.Function; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.atlas.ApplicationProperties; @@ -25,7 +26,19 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.groovy.GroovyExpression; -import org.apache.atlas.repository.graphdb.*; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient; +import org.apache.atlas.repository.graphdb.AtlasGraphManagement; +import org.apache.atlas.repository.graphdb.AtlasGraphQuery; +import org.apache.atlas.repository.graphdb.AtlasGraphTraversal; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery; +import org.apache.atlas.repository.graphdb.AtlasIndexQueryParameter; +import org.apache.atlas.repository.graphdb.AtlasPropertyKey; +import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.graphdb.GraphIndexQueryParameters; +import org.apache.atlas.repository.graphdb.GremlinVersion; import org.apache.atlas.repository.graphdb.janus.query.AtlasJanusGraphQuery; import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter; import org.apache.atlas.type.AtlasType; @@ -62,9 +75,12 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.io.IOException; import java.io.OutputStream; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY; @@ -403,7 +419,11 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE } public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>> wrapVertices(Iterable<? extends Vertex> it) { - return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input)).collect(Collectors.toList()); + + return Iterables.transform(it, + (Function<Vertex, AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>) input -> + GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input)); + } public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterator<? extends Edge> it) { @@ -413,7 +433,11 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE } public Iterable<AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>> wrapEdges(Iterable<? extends Edge> it) { - return StreamSupport.stream(it.spliterator(), false).map(input -> GraphDbObjectFactory.createEdge(AtlasJanusGraph.this, input)).collect(Collectors.toList()); + + return Iterables.transform(it, + (Function<Edge, AtlasEdge<AtlasJanusVertex, AtlasJanusEdge>>) input -> + GraphDbObjectFactory.createEdge(AtlasJanusGraph.this, input)); + } public void addMultiProperties(Set<String> names) { diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java index fdc9fd0..b6e2c26 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java @@ -20,12 +20,14 @@ package org.apache.atlas.repository.graphdb.janus; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.stream.StreamSupport; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; import org.apache.atlas.repository.graphdb.AtlasSchemaViolationException; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.repository.graphdb.AtlasVertexQuery; +import org.apache.atlas.repository.graphdb.utils.IteratorToIterableAdapter; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Vertex; @@ -78,6 +80,21 @@ public class AtlasJanusVertex extends AtlasJanusElement<Vertex> implements Atlas return graph.wrapEdges(edges); } + @Override + public long getEdgesCount(AtlasEdgeDirection dir, String edgeLabel) { + Direction direction = AtlasJanusObjectFactory.createDirection(dir); + Iterator<Edge> it = getWrappedElement().edges(direction, edgeLabel); + IteratorToIterableAdapter<Edge> iterable = new IteratorToIterableAdapter<>(it); + return StreamSupport.stream(iterable.spliterator(), true).count(); + } + + @Override + public boolean hasEdges(AtlasEdgeDirection dir, String edgeLabel) { + Direction direction = AtlasJanusObjectFactory.createDirection(dir); + Iterator<Edge> edges = getWrappedElement().edges(direction, edgeLabel); + return edges.hasNext(); + } + private JanusGraphVertex getAsJanusVertex() { return (JanusGraphVertex)getWrappedElement(); } diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 2b8227a..b9e3a5e 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -35,7 +35,6 @@ import org.apache.atlas.repository.graphdb.AtlasVertexQuery; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.type.AtlasArrayType; import org.apache.atlas.type.AtlasMapType; -import org.apache.atlas.util.AtlasGremlinQueryProvider; import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.atlas.v1.model.instance.Id; import org.apache.atlas.v1.model.instance.Referenceable; @@ -72,7 +71,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.UUID; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; @@ -269,6 +267,21 @@ public final class GraphHelper { return ret; } + public static long getAdjacentEdgesCountByLabel(AtlasVertex instanceVertex, AtlasEdgeDirection direction, final String edgeLabel) { + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getAdjacentEdgesCountByLabel"); + if (LOG.isDebugEnabled()) { + LOG.debug("Finding edges for {} with label {}", string(instanceVertex), edgeLabel); + } + + long ret = 0; + if(instanceVertex != null && edgeLabel != null) { + ret = instanceVertex.getEdgesCount(direction, edgeLabel); + } + + RequestContext.get().endMetricRecord(metric); + return ret; + } + public static boolean isPropagationEnabled(AtlasVertex classificationVertex) { boolean ret = false; @@ -437,6 +450,14 @@ public final class GraphHelper { return getAdjacentEdgesByLabel(instanceVertex, AtlasEdgeDirection.OUT, edgeLabel); } + public static long getOutGoingEdgesCountByLabel(AtlasVertex instanceVertex, String edgeLabel) { + return getAdjacentEdgesCountByLabel(instanceVertex, AtlasEdgeDirection.OUT, edgeLabel); + } + + public static long getInComingEdgesCountByLabel(AtlasVertex instanceVertex, String edgeLabel) { + return getAdjacentEdgesCountByLabel(instanceVertex, AtlasEdgeDirection.IN, edgeLabel); + } + public AtlasEdge getEdgeForLabel(AtlasVertex vertex, String edgeLabel, AtlasRelationshipEdgeDirection edgeDirection) { AtlasEdge ret; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java index d1c1f12..ab431bc 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipStoreV2.java @@ -19,6 +19,7 @@ package org.apache.atlas.repository.store.graph.v2; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.RequestContext; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasPrivilege; @@ -49,6 +50,7 @@ import org.apache.atlas.type.AtlasRelationshipType; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AtlasPerfMetrics; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; @@ -67,24 +69,25 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.function.Function; +import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED; import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED; -import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.*; +import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.BOTH; +import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.NONE; +import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.ONE_TO_TWO; +import static org.apache.atlas.model.typedef.AtlasRelationshipDef.PropagateTags.TWO_TO_ONE; import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.HOME_ID_KEY; import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIPTYPE_TAG_PROPAGATION_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY; -import static org.apache.atlas.AtlasConfiguration.NOTIFICATION_RELATIONSHIPS_ENABLED; - - import static org.apache.atlas.repository.graph.GraphHelper.getBlockedClassificationIds; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationEntityGuid; import static org.apache.atlas.repository.graph.GraphHelper.getClassificationName; import static org.apache.atlas.repository.graph.GraphHelper.getPropagatableClassifications; -import static org.apache.atlas.repository.graph.GraphHelper.getIncomingEdgesByLabel; import static org.apache.atlas.repository.graph.GraphHelper.getPropagateTags; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getState; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getTypeName; @@ -776,23 +779,39 @@ public class AtlasRelationshipStoreV2 implements AtlasRelationshipStore { } public AtlasEdge getRelationshipEdge(AtlasVertex fromVertex, AtlasVertex toVertex, String relationshipLabel) { - AtlasEdge ret = null; - Iterator<AtlasEdge> edgesIterator = getIncomingEdgesByLabel(toVertex, relationshipLabel); + AtlasPerfMetrics.MetricRecorder metric = RequestContext.get().startMetricRecord("getRelationshipEdge"); + + AtlasEdge ret = null; + + if (toVertex.hasEdges(AtlasEdgeDirection.IN, relationshipLabel) && fromVertex.hasEdges(AtlasEdgeDirection.OUT, relationshipLabel)) { + long fromVertexOutgoingEdgeCount = graphHelper.getOutGoingEdgesCountByLabel(fromVertex, relationshipLabel); + long toVertexIncomingEdgeCount = graphHelper.getInComingEdgesCountByLabel(toVertex, relationshipLabel); + if (toVertexIncomingEdgeCount < fromVertexOutgoingEdgeCount) { + Iterator<AtlasEdge> edgesIteratorIn = graphHelper.getIncomingEdgesByLabel(toVertex, relationshipLabel); + ret = getActiveEdgeFromList(edgesIteratorIn, fromVertex.getId(), e -> e.getOutVertex().getId()); + } else { + Iterator<AtlasEdge> edgesIteratorOut = graphHelper.getOutGoingEdgesByLabel(fromVertex, relationshipLabel); + ret = getActiveEdgeFromList(edgesIteratorOut, toVertex.getId(), e -> e.getInVertex().getId()); + } + } + RequestContext.get().endMetricRecord(metric); + return ret; + } + + private AtlasEdge getActiveEdgeFromList(Iterator<AtlasEdge> edgesIterator, Object vertexIdToCompare, Function<AtlasEdge, Object> edgeIdFn) { while (edgesIterator != null && edgesIterator.hasNext()) { AtlasEdge edge = edgesIterator.next(); - if (edge != null) { Status status = graphHelper.getStatus(edge); - if ((status == null || status == ACTIVE) && edge.getOutVertex().getId().equals(fromVertex.getId())) { - ret = edge; - break; + if ((status == null || status == ACTIVE) && edgeIdFn.apply(edge).equals(vertexIdToCompare)) { + return edge; } } } - return ret; + return null; } private Long getRelationshipVersion(AtlasRelationship relationship) {
