Repository: atlas Updated Branches: refs/heads/0.8-incubating bc2d39d10 -> de60a6560
ATLAS-1988: added REST API to search for related entities Signed-off-by: Madhan Neethiraj <mad...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/de60a656 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/de60a656 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/de60a656 Branch: refs/heads/0.8-incubating Commit: de60a6560113e311cc46a773bedd4071c7d413fe Parents: bc2d39d Author: Sarath Subramanian <ssubraman...@hortonworks.com> Authored: Tue Jul 18 23:09:19 2017 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Wed Jul 26 00:36:31 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasErrorCode.java | 2 + .../main/java/org/apache/atlas/SortOrder.java | 22 +++ .../model/discovery/AtlasSearchResult.java | 2 +- .../atlas/discovery/AtlasDiscoveryService.java | 13 ++ .../atlas/discovery/EntityDiscoveryService.java | 134 +++++++++++++++++++ .../store/graph/v1/EntityGraphRetriever.java | 2 +- .../atlas/util/AtlasGremlin2QueryProvider.java | 6 + .../atlas/util/AtlasGremlinQueryProvider.java | 5 +- .../apache/atlas/web/rest/DiscoveryREST.java | 39 ++++++ .../NotificationHookConsumerKafkaTest.java | 26 +++- 10 files changed, 246 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index d723b2a..f267a33 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -71,6 +71,8 @@ public enum AtlasErrorCode { BAD_REQUEST(400, "ATLAS-400-00-029", "{0}"), PARAMETER_PARSING_FAILED(400, "ATLAS-400-00-02A", "Parameter parsing failed at: {0}"), MISSING_MANDATORY_ATTRIBUTE(400, "ATLAS-400-00-02B", "Mandatory field {0}.{1} has empty/null value"), + INVALID_RELATIONSHIP_ATTRIBUTE(400, "ATLAS-400-00-02C", "Expected attribute {0} to be a relationship but found type {}"), + INVALID_RELATIONSHIP_TYPE(400, "ATLAS-400-00-02D", "Invalid entity type '{0}', guid '{1}' in relationship search"), // All Not found enums go here TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"), http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/intg/src/main/java/org/apache/atlas/SortOrder.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/SortOrder.java b/intg/src/main/java/org/apache/atlas/SortOrder.java new file mode 100644 index 0000000..e3eef4e --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/SortOrder.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas; + +public enum SortOrder { + ASCENDING, DESCENDING +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java index 5827440..0c32e01 100644 --- a/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java +++ b/intg/src/main/java/org/apache/atlas/model/discovery/AtlasSearchResult.java @@ -185,7 +185,7 @@ public class AtlasSearchResult implements Serializable { '}'; } - public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE } + public enum AtlasQueryType { DSL, FULL_TEXT, GREMLIN, BASIC, ATTRIBUTE, RELATIONSHIP } @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java index 764b548..ead5d3c 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/AtlasDiscoveryService.java @@ -19,6 +19,7 @@ package org.apache.atlas.discovery; +import org.apache.atlas.SortOrder; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.AtlasSearchResult; import org.apache.atlas.model.discovery.SearchParameters; @@ -65,4 +66,16 @@ public interface AtlasDiscoveryService { * @throws AtlasBaseException */ AtlasSearchResult searchWithParameters(SearchParameters searchParameters) throws AtlasBaseException; + + /** + * + * @param guid unique ID of the entity. + * @param relation relation name. + * @param sortByAttribute sort the result using this attribute name, default value is 'name' + * @param sortOrder sorting order + * @param limit number of resultant rows (for pagination). [ limit > 0 ] and [ limit < maxlimit ]. -1 maps to atlas.search.defaultlimit property. + * @param offset offset to the results returned (for pagination). [ offset >= 0 ]. -1 maps to offset 0. + * @return AtlasSearchResult + */ + AtlasSearchResult searchRelatedEntities(String guid, String relation, String sortByAttribute, SortOrder sortOrder, int limit, int offset) throws AtlasBaseException; } http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java index b183c72..2a24782 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java @@ -21,6 +21,7 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; +import org.apache.atlas.SortOrder; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; import org.apache.atlas.exception.AtlasBaseException; @@ -67,6 +68,7 @@ import scala.util.Either; import scala.util.parsing.combinator.Parsers.NoSuccess; import javax.inject.Inject; +import javax.script.Bindings; import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.*; @@ -74,10 +76,19 @@ import java.util.*; import static org.apache.atlas.AtlasErrorCode.CLASSIFICATION_NOT_FOUND; import static org.apache.atlas.AtlasErrorCode.DISCOVERY_QUERY_FAILED; import static org.apache.atlas.AtlasErrorCode.UNKNOWN_TYPENAME; +import static org.apache.atlas.SortOrder.DESCENDING; +import static org.apache.atlas.model.TypeCategory.ARRAY; +import static org.apache.atlas.model.TypeCategory.MAP; +import static org.apache.atlas.model.TypeCategory.OBJECT_ID_TYPE; +import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH_DESCENDING_SORT; +import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.RELATIONSHIP_SEARCH_ASCENDING_SORT; @Component public class EntityDiscoveryService implements AtlasDiscoveryService { private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class); + private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name"; private final AtlasGraph graph; private final DefaultGraphPersistenceStrategy graphPersistenceStrategy; @@ -485,6 +496,98 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { return ret; } + @Override + @GraphTransaction + public AtlasSearchResult searchRelatedEntities(String guid, String relation, String sortByAttributeName, + SortOrder sortOrder, int limit, int offset) throws AtlasBaseException { + AtlasSearchResult ret = new AtlasSearchResult(AtlasQueryType.RELATIONSHIP); + + if (StringUtils.isEmpty(guid) || StringUtils.isEmpty(relation)) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_PARAMETERS, "guid: '" + guid + "', relation: '" + relation + "'"); + } + + AtlasVertex entityVertex = entityRetriever.getEntityVertex(guid); + String entityTypeName = GraphHelper.getTypeName(entityVertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entityTypeName); + + if (entityType == null) { + throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_TYPE, entityTypeName, guid); + } + + AtlasAttribute attribute = entityType.getAttribute(relation); + + if (attribute != null) { + if (isRelationshipAttribute(attribute)) { + relation = EDGE_LABEL_PREFIX + attribute.getQualifiedName(); + } else { + throw new AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_ATTRIBUTE, relation, attribute.getTypeName()); + } + } + + if (StringUtils.isEmpty(sortByAttributeName)) { + sortByAttributeName = DEFAULT_SORT_ATTRIBUTE_NAME; + } + + AtlasAttribute sortByAttribute = entityType.getAttribute(sortByAttributeName); + + if (sortByAttribute == null) { + sortByAttributeName = null; + sortOrder = null; + } else { + sortByAttributeName = sortByAttribute.getQualifiedName(); + + if (sortOrder == null) { + sortOrder = SortOrder.ASCENDING; + } + } + + String relatedEntitiesQuery = getRelatedEntitiesQuery(sortOrder); + ScriptEngine scriptEngine = graph.getGremlinScriptEngine(); + Bindings bindings = scriptEngine.createBindings(); + QueryParams params = validateSearchParams(limit, offset); + + bindings.put("g", graph); + bindings.put("guid", guid); + bindings.put("relation", relation); + bindings.put("sortAttributeName", sortByAttributeName); + bindings.put("offset", params.offset()); + bindings.put("limit", params.offset() + params.limit()); + + try { + Object result = graph.executeGremlinScript(scriptEngine, bindings, relatedEntitiesQuery, false); + + if (result instanceof List && CollectionUtils.isNotEmpty((List) result)) { + List<?> queryResult = (List) result; + Object firstElement = queryResult.get(0); + + if (firstElement instanceof AtlasVertex) { + List<AtlasVertex> vertices = (List<AtlasVertex>) queryResult; + List<AtlasEntityHeader> resultList = new ArrayList<>(vertices.size()); + + for (AtlasVertex vertex : vertices) { + resultList.add(entityRetriever.toAtlasEntityHeader(vertex)); + } + + ret.setEntities(resultList); + } + } + + if (ret.getEntities() == null) { + ret.setEntities(new ArrayList<AtlasEntityHeader>()); + } + } catch (ScriptException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Gremlin script execution failed for relationship search query: " + e); + } + + throw new AtlasBaseException(AtlasErrorCode.INTERNAL_ERROR, "Relationship search query failed"); + } finally { + graph.releaseGremlinScriptEngine(scriptEngine); + } + + return ret; + } + public int getMaxResultSetSize() { return maxResultSetSize; } @@ -640,4 +743,35 @@ public class EntityDiscoveryService implements AtlasDiscoveryService { return ""; } + + private boolean isRelationshipAttribute(AtlasAttribute attribute) throws AtlasBaseException { + boolean ret = true; + AtlasType attrType = attribute.getAttributeType(); + + if (attrType.getTypeCategory() == ARRAY) { + attrType = ((AtlasArrayType) attrType).getElementType(); + } else if (attrType.getTypeCategory() == MAP) { + attrType = ((AtlasMapType) attrType).getValueType(); + } + + if (attrType.getTypeCategory() != OBJECT_ID_TYPE) { + ret = false; + } + + return ret; + } + + private String getRelatedEntitiesQuery(SortOrder sortOrder) { + final String ret; + + if (sortOrder == null) { + ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH); + } else if (sortOrder == DESCENDING) { + ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH_DESCENDING_SORT); + } else { + ret = gremlinQueryProvider.getQuery(RELATIONSHIP_SEARCH_ASCENDING_SORT); + } + + return ret; + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java index b8e0227..f6c8c72 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java @@ -131,7 +131,7 @@ public final class EntityGraphRetriever { return toAtlasEntityHeader(entityVertex, Collections.<String>emptySet()); } - private AtlasVertex getEntityVertex(String guid) throws AtlasBaseException { + public AtlasVertex getEntityVertex(String guid) throws AtlasBaseException { AtlasVertex ret = AtlasGraphUtilsV1.findByGuid(guid); if (ret == null) { http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java index 1bf0346..a61bb66 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlin2QueryProvider.java @@ -75,6 +75,12 @@ public class AtlasGremlin2QueryProvider extends AtlasGremlinQueryProvider { return " [startIdx..<endIdx].toList()"; case GUID_PREFIX_FILTER: return ".filter{it.'__guid'.matches(guid)}"; + case RELATIONSHIP_SEARCH: + return "g.V('__guid', guid).both(relation)[offset..<limit].toList()"; + case RELATIONSHIP_SEARCH_DESCENDING_SORT: + return "g.V('__guid', guid).both(relation)[offset..<limit].order{it.b.getProperty(sortAttributeName) <=> it.a.getProperty(sortAttributeName)}.toList()"; + case RELATIONSHIP_SEARCH_ASCENDING_SORT: + return "g.V('__guid', guid).both(relation)[offset..<limit].order{it.a.getProperty(sortAttributeName) <=> it.b.getProperty(sortAttributeName)}.toList()"; } // Should never reach this point return null; http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java index 8481a4f..def4733 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasGremlinQueryProvider.java @@ -60,6 +60,9 @@ public abstract class AtlasGremlinQueryProvider { BASIC_SEARCH_CLASSIFICATION_FILTER, BASIC_SEARCH_STATE_FILTER, TO_RANGE_LIST, - GUID_PREFIX_FILTER + GUID_PREFIX_FILTER, + RELATIONSHIP_SEARCH, + RELATIONSHIP_SEARCH_ASCENDING_SORT, + RELATIONSHIP_SEARCH_DESCENDING_SORT } } http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java index efab72a..86f618c 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java @@ -18,6 +18,7 @@ package org.apache.atlas.web.rest; import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.SortOrder; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.discovery.AtlasDiscoveryService; import org.apache.atlas.model.discovery.AtlasSearchResult; @@ -255,6 +256,44 @@ public class DiscoveryREST { } } + /** + * Relationship search to search for related entities satisfying the search parameters + * @param guid Attribute name + * @param relation relationName + * @param sortByAttribute sort the result using this attribute name, default value is 'name' + * @param sortOrder sorting order + * @param limit limit the result set to only include the specified number of entries + * @param offset start offset of the result set (useful for pagination) + * @return Atlas search result + * @throws AtlasBaseException + * + * @HTTP 200 On successful search + * @HTTP 400 guid is not a valid entity type or attributeName is not a valid relationship attribute + */ + @GET + @Path("relationship") + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public AtlasSearchResult searchRelatedEntities(@QueryParam("guid") String guid, + @QueryParam("relation") String relation, + @QueryParam("sortBy") String sortByAttribute, + @QueryParam("sortOrder") SortOrder sortOrder, + @QueryParam("limit") int limit, + @QueryParam("offset") int offset) throws AtlasBaseException { + AtlasPerfTracer perf = null; + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.relatedEntitiesSearchUsingGremlin(" + guid + + ", " + relation + ", " + sortByAttribute + ", " + sortOrder + ", " + limit + ", " + offset + ")"); + } + + return atlasDiscoveryService.searchRelatedEntities(guid, relation, sortByAttribute, sortOrder, limit, offset); + } finally { + AtlasPerfTracer.log(perf); + } + } + private boolean isEmpty(SearchParameters.FilterCriteria filterCriteria) { return filterCriteria == null || (StringUtils.isEmpty(filterCriteria.getAttributeName()) && CollectionUtils.isEmpty(filterCriteria.getCriterion())); http://git-wip-us.apache.org/repos/asf/atlas/blob/de60a656/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index eb37fa8..1a3c413 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -100,8 +100,30 @@ public class NotificationHookConsumerKafkaTest { produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false); - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + + consumeOneMessage(consumer, hookConsumer); + verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); + + // produce another message, and make sure it moves ahead. If commit succeeded, this would work. + produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); + consumeOneMessage(consumer, hookConsumer); + verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); + reset(atlasEntityStore); + } + finally { + kafkaNotification.close(); + } + } + + @Test + public void testConsumerConsumesNewMessageWithAutoCommitDisabled1() throws AtlasException, InterruptedException, AtlasBaseException { + try { + produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); + + NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer);