ATLAS-1119 Add retries for edge label creation (sumasai via shwethags) (cherry picked from commit ab95c1a7bbf1209c22aaf661a69bd007c5277bc8)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/59477783 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/59477783 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/59477783 Branch: refs/heads/0.7-incubating Commit: 59477783701f7a7eaa4758d43af608ef689d7dfb Parents: 75d046c Author: Shwetha GS <[email protected]> Authored: Tue Aug 16 15:14:59 2016 +0530 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Dec 22 16:28:38 2016 -0800 ---------------------------------------------------------------------- release-log.txt | 1 + .../atlas/repository/graph/DeleteHandler.java | 6 +- .../graph/GraphBackedMetadataRepository.java | 2 +- .../atlas/repository/graph/GraphHelper.java | 86 ++++++++++--- .../graph/GraphToTypedInstanceMapper.java | 6 +- .../graph/TypedInstanceToGraphMapper.java | 4 +- .../typestore/GraphBackedTypeStore.java | 2 +- .../repository/graph/GraphHelperMockTest.java | 121 +++++++++++++++++++ .../atlas/repository/graph/GraphHelperTest.java | 2 +- .../typestore/GraphBackedTypeStoreTest.java | 2 +- 10 files changed, 204 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 6871765..ec780fb 100644 --- a/release-log.txt +++ b/release-log.txt @@ -53,6 +53,7 @@ ATLAS-1129 Remove notification failed logs on retry and add sleep between retrie ATLAS-1126 Fix NPE in getSchema calls (sumasai) ATLAS-1125 Enable compression on hbase audit table (shwethags via sumasai) ATLAS-1121 NPE while submitting topology in StormHook (ayubkhan via sumasai) +ATLAS-1119 Add retries for edge label creation (sumasai via shwethags) ATLAS-1111 Data loss is observed when atlas is restarted while hive_table metadata ingestion into kafka topic is in-progress(shwethags via sumasai) ATLAS-1108: updated references to atlas.rest.address to handle multiple URLs ATLAS-1112 Hive table GET response from atlas server had duplicate column entries ( ayubkhan, mneethiraj via sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java index 8d31c1b..ac08313 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java @@ -136,7 +136,7 @@ public abstract class DeleteHandler { DataTypes.TypeCategory elementTypeCategory = elementType.getTypeCategory(); if (elementTypeCategory == DataTypes.TypeCategory.STRUCT || elementTypeCategory == DataTypes.TypeCategory.CLASS) { - Iterator<Edge> edges = GraphHelper.getOutGoingEdgesByLabel(instanceVertex, edgeLabel); + Iterator<Edge> edges = graphHelper.getOutGoingEdgesByLabel(instanceVertex, edgeLabel); if (edges != null) { while (edges.hasNext()) { Edge edge = edges.next(); @@ -206,7 +206,7 @@ public abstract class DeleteHandler { public void deleteEdgeReference(Vertex outVertex, String edgeLabel, DataTypes.TypeCategory typeCategory, boolean isComposite) throws AtlasException { - Edge edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel); + Edge edge = graphHelper.getEdgeForLabel(outVertex, edgeLabel); if (edge != null) { deleteEdgeReference(edge, typeCategory, isComposite, false); } @@ -273,7 +273,7 @@ public abstract class DeleteHandler { case CLASS: //If its class attribute, its the only edge between two vertices if (attributeInfo.multiplicity.nullAllowed()) { - edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel); + edge = graphHelper.getEdgeForLabel(outVertex, edgeLabel); if (shouldUpdateReverseAttribute) { GraphHelper.setProperty(outVertex, propertyName, null); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java index e301a00..63b29df 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java @@ -267,7 +267,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { try { final String entityTypeName = GraphHelper.getTypeName(instanceVertex); String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, traitNameToBeDeleted); - Edge edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); + Edge edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); if(edge != null) { deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index 2ef50c9..334177c 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -30,9 +30,11 @@ import com.tinkerpop.blueprints.Element; import com.tinkerpop.blueprints.Graph; import com.tinkerpop.blueprints.GraphQuery; import com.tinkerpop.blueprints.Vertex; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.RequestContext; import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.RepositoryException; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.ITypedInstance; import org.apache.atlas.typesystem.ITypedReferenceableInstance; @@ -63,18 +65,48 @@ public final class GraphHelper { private static final Logger LOG = LoggerFactory.getLogger(GraphHelper.class); public static final String EDGE_LABEL_PREFIX = "__"; + public static final String RETRY_COUNT = "atlas.graph.storage.num.retries"; + public static final String RETRY_DELAY = "atlas.graph.storage.retry.sleeptime.ms"; private static final TypeSystem typeSystem = TypeSystem.getInstance(); - private static final GraphHelper INSTANCE = new GraphHelper(TitanGraphProvider.getGraphInstance()); + private static volatile GraphHelper INSTANCE; private TitanGraph titanGraph; + private static int maxRetries; + public static long retrySleepTimeMillis; - private GraphHelper(TitanGraph titanGraph) { + @VisibleForTesting + GraphHelper(TitanGraph titanGraph) { this.titanGraph = titanGraph; + try { + maxRetries = ApplicationProperties.get().getInt(RETRY_COUNT, 3); + retrySleepTimeMillis = ApplicationProperties.get().getLong(RETRY_DELAY, 1000); + } catch (AtlasException e) { + LOG.error("Could not load configuration. Setting to default value for " + RETRY_COUNT, e); + } } public static GraphHelper getInstance() { + if ( INSTANCE == null) { + synchronized (GraphHelper.class) { + if (INSTANCE == null) { + INSTANCE = new GraphHelper(TitanGraphProvider.getGraphInstance()); + } + } + } + return INSTANCE; + } + + @VisibleForTesting + static GraphHelper getInstance(TitanGraph graph) { + if ( INSTANCE == null) { + synchronized (GraphHelper.class) { + if (INSTANCE == null) { + INSTANCE = new GraphHelper(graph); + } + } + } return INSTANCE; } @@ -130,19 +162,41 @@ public final class GraphHelper { return edge; } - public Edge getOrCreateEdge(Vertex outVertex, Vertex inVertex, String edgeLabel) { - Iterator<Edge> edges = GraphHelper.getAdjacentEdgesByLabel(inVertex, Direction.IN, edgeLabel); + public Edge getOrCreateEdge(Vertex outVertex, Vertex inVertex, String edgeLabel) throws RepositoryException { + for (int numRetries = 0; numRetries < maxRetries; numRetries++) { + try { + LOG.debug("Running edge creation attempt {}", numRetries); + Iterator<Edge> edges = getAdjacentEdgesByLabel(inVertex, Direction.IN, edgeLabel); + + while (edges.hasNext()) { + Edge edge = edges.next(); + if (edge.getVertex(Direction.OUT).getId().toString().equals(outVertex.getId().toString())) { + Id.EntityState edgeState = getState(edge); + if (edgeState == null || edgeState == Id.EntityState.ACTIVE) { + return edge; + } + } + } + + return addEdge(outVertex, inVertex, edgeLabel); + } catch (Exception e) { + LOG.warn(String.format("Exception while trying to create edge from %s to %s with label %s. Retrying", + vertexString(outVertex), vertexString(inVertex), edgeLabel), e); + if (numRetries == (maxRetries - 1)) { + LOG.error("Max retries exceeded for edge creation {} {} {} ", outVertex, inVertex, edgeLabel, e); + throw new RepositoryException("Edge creation failed after retries", e); + } - while (edges.hasNext()) { - Edge edge = edges.next(); - if (edge.getVertex(Direction.OUT).getId().toString().equals(outVertex.getId().toString())) { - Id.EntityState edgeState = getState(edge); - if (edgeState == null || edgeState == Id.EntityState.ACTIVE) { - return edge; + try { + LOG.info("Retrying with delay of {} ms ", retrySleepTimeMillis); + Thread.sleep(retrySleepTimeMillis); + } catch(InterruptedException ie) { + LOG.warn("Retry interrupted during edge creation "); + throw new RepositoryException("Retry interrupted during edge creation", ie); } } } - return addEdge(outVertex, inVertex, edgeLabel); + return null; } @@ -197,7 +251,7 @@ public final class GraphHelper { //In some cases of parallel APIs, the edge is added, but get edge by label doesn't return the edge. ATLAS-1104 //So traversing all the edges - public static Iterator<Edge> getAdjacentEdgesByLabel(Vertex instanceVertex, Direction direction, final String edgeLabel) { + public Iterator<Edge> getAdjacentEdgesByLabel(Vertex instanceVertex, Direction direction, final String edgeLabel) { LOG.debug("Finding edges for {} with label {}", string(instanceVertex), edgeLabel); if(instanceVertex != null && edgeLabel != null) { final Iterator<Edge> iterator = instanceVertex.getEdges(direction).iterator(); @@ -234,7 +288,7 @@ public final class GraphHelper { return null; } - public static Iterator<Edge> getOutGoingEdgesByLabel(Vertex instanceVertex, String edgeLabel) { + public Iterator<Edge> getOutGoingEdgesByLabel(Vertex instanceVertex, String edgeLabel) { return getAdjacentEdgesByLabel(instanceVertex, Direction.OUT, edgeLabel); } @@ -245,8 +299,8 @@ public final class GraphHelper { * @param edgeLabel * @return */ - public static Edge getEdgeForLabel(Vertex vertex, String edgeLabel) { - Iterator<Edge> iterator = GraphHelper.getAdjacentEdgesByLabel(vertex, Direction.OUT, edgeLabel); + public Edge getEdgeForLabel(Vertex vertex, String edgeLabel) { + Iterator<Edge> iterator = getAdjacentEdgesByLabel(vertex, Direction.OUT, edgeLabel); Edge latestDeletedEdge = null; long latestDeletedEdgeTime = Long.MIN_VALUE; @@ -525,4 +579,4 @@ public final class GraphHelper { } return key; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java index 5fbe46b..5c7cb2e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphToTypedInstanceMapper.java @@ -163,7 +163,7 @@ public final class GraphToTypedInstanceMapper { Edge edge; if (edgeId == null) { - edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); + edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); } else { edge = graphHelper.getEdgeByEdgeId(instanceVertex, relationshipLabel, edgeId); } @@ -269,7 +269,7 @@ public final class GraphToTypedInstanceMapper { Edge edge; if (edgeId == null) { - edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); + edge = graphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); } else { edge = graphHelper.getEdgeByEdgeId(instanceVertex, relationshipLabel, edgeId); } @@ -296,7 +296,7 @@ public final class GraphToTypedInstanceMapper { TraitType traitType, ITypedStruct traitInstance) throws AtlasException { String relationshipLabel = GraphHelper.getTraitLabel(typedInstanceTypeName, traitName); LOG.debug("Finding edge for {} -> label {} ", instanceVertex, relationshipLabel); - Iterator<Edge> edgeIterator = GraphHelper.getOutGoingEdgesByLabel(instanceVertex, relationshipLabel); + Iterator<Edge> edgeIterator = graphHelper.getOutGoingEdgesByLabel(instanceVertex, relationshipLabel); while (edgeIterator.hasNext()) { Edge edge = edgeIterator.next(); final Vertex traitInstanceVertex = edge.getVertex(Direction.IN); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java index 0512489..2e0414e 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java @@ -211,9 +211,9 @@ public final class TypedInstanceToGraphMapper { case STRUCT: case CLASS: - String edgeLabel = GraphHelper.getEdgeLabel(typedInstance, attributeInfo); + String edgeLabel = graphHelper.getEdgeLabel(typedInstance, attributeInfo); - Edge currentEdge = GraphHelper.getEdgeForLabel(instanceVertex, edgeLabel); + Edge currentEdge = graphHelper.getEdgeForLabel(instanceVertex, edgeLabel); String newEdgeId = addOrUpdateReference(instanceVertex, attributeInfo, attributeInfo.dataType(), attrValue, currentEdge, edgeLabel, operation); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java index 4530cac..a94d157 100755 --- a/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java +++ b/repository/src/main/java/org/apache/atlas/repository/typestore/GraphBackedTypeStore.java @@ -282,7 +282,7 @@ public class GraphBackedTypeStore implements ITypeStore { private ImmutableSet<String> getSuperTypes(Vertex vertex) { Set<String> superTypes = new HashSet<>(); - Iterator<Edge> edges = GraphHelper.getOutGoingEdgesByLabel(vertex, SUPERTYPE_EDGE_LABEL); + Iterator<Edge> edges = graphHelper.getOutGoingEdgesByLabel(vertex, SUPERTYPE_EDGE_LABEL); while (edges.hasNext()) { Edge edge = edges.next(); superTypes.add((String) edge.getVertex(Direction.IN).getProperty(Constants.TYPENAME_PROPERTY_KEY)); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperMockTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperMockTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperMockTest.java new file mode 100644 index 0000000..5ebc2f7 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperMockTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.graph; + +import com.thinkaurelius.titan.core.TitanGraph; +import com.thinkaurelius.titan.core.TitanVertex; +import com.tinkerpop.blueprints.Direction; +import com.tinkerpop.blueprints.Edge; +import org.apache.atlas.repository.RepositoryException; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Iterator; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +public class GraphHelperMockTest { + + private GraphHelper graphHelperInstance; + + private TitanGraph graph; + + @BeforeClass + public void setup() { + MockitoAnnotations.initMocks(this); + graph = mock(TitanGraph.class); + graphHelperInstance = GraphHelper.getInstance(graph); + } + + @Test(expectedExceptions = RepositoryException.class) + public void testGetOrCreateEdgeLabelWithMaxRetries() throws Exception { + final String edgeLabel = "testLabel"; + TitanVertex v1 = mock(TitanVertex.class); + TitanVertex v2 = mock(TitanVertex.class); + + Iterable noEdgesIterable = new Iterable<Edge>() { + @Override + public Iterator<Edge> iterator() { + return new Iterator<Edge>() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public Edge next() { + return null; + } + + @Override + public void remove() { + } + }; + } + }; + when(v2.getEdges(Direction.IN)).thenReturn(noEdgesIterable); + when(v1.getEdges(Direction.OUT)).thenReturn(noEdgesIterable); + + when(v1.getId()).thenReturn(new String("1234")); + when(v2.getId()).thenReturn(new String("5678")); + when(graph.addEdge(null, v1, v2, edgeLabel)).thenThrow(new RuntimeException("Unique property constraint violated")); + graphHelperInstance.getOrCreateEdge(v1, v2, edgeLabel); + } + + @Test + public void testGetOrCreateEdgeLabelWithRetries() throws Exception { + final String edgeLabel = "testLabel"; + TitanVertex v1 = mock(TitanVertex.class); + TitanVertex v2 = mock(TitanVertex.class); + Edge edge = mock(Edge.class); + + Iterable noEdgesIterable = new Iterable<Edge>() { + @Override + public Iterator<Edge> iterator() { + return new Iterator<Edge>() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public Edge next() { + return null; + } + + @Override + public void remove() { + } + }; + } + }; + when(v2.getEdges(Direction.IN)).thenReturn(noEdgesIterable); + when(v1.getEdges(Direction.OUT)).thenReturn(noEdgesIterable); + + when(v1.getId()).thenReturn(new String("v1")); + when(v2.getId()).thenReturn(new String("v2")); + when(edge.getId()).thenReturn(new String("edge")); + when(graph.addEdge(null, v1, v2, edgeLabel)) + .thenThrow(new RuntimeException("Unique property constraint violated")).thenReturn(edge); + Edge redge = graphHelperInstance.getOrCreateEdge(v1, v2, edgeLabel); + assertEquals(edge, redge); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java index 428846f..db82de6 100644 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphHelperTest.java @@ -75,7 +75,7 @@ public class GraphHelperTest { v1.addEdge("l1", v2); v1.addEdge("l2", v2); - Iterator<Edge> iterator = GraphHelper.getOutGoingEdgesByLabel(v1, "l1"); + Iterator<Edge> iterator = GraphHelper.getInstance().getOutGoingEdgesByLabel(v1, "l1"); assertTrue(iterator.hasNext()); assertTrue(iterator.hasNext()); assertNotNull(iterator.next()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/59477783/repository/src/test/java/org/apache/atlas/repository/typestore/GraphBackedTypeStoreTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/typestore/GraphBackedTypeStoreTest.java b/repository/src/test/java/org/apache/atlas/repository/typestore/GraphBackedTypeStoreTest.java index 789c0fe..90e622a 100755 --- a/repository/src/test/java/org/apache/atlas/repository/typestore/GraphBackedTypeStoreTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/typestore/GraphBackedTypeStoreTest.java @@ -223,7 +223,7 @@ public class GraphBackedTypeStoreTest { private int countOutgoingEdges(Vertex typeVertex, String edgeLabel) { - Iterator<Edge> outGoingEdgesByLabel = GraphHelper.getOutGoingEdgesByLabel(typeVertex, edgeLabel); + Iterator<Edge> outGoingEdgesByLabel = GraphHelper.getInstance().getOutGoingEdgesByLabel(typeVertex, edgeLabel); int edgeCount = 0; for (Iterator<Edge> iterator = outGoingEdgesByLabel; iterator.hasNext();) { iterator.next();
