Repository: incubator-atlas Updated Branches: refs/heads/master 5527afb0d -> f74e43c2b
ATLAS-1961: Basic search improvement in use of Solr index for attribute filtering (# 3) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/f74e43c2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/f74e43c2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/f74e43c2 Branch: refs/heads/master Commit: f74e43c2bc3fde30cb1d707f1e7c08b8770ec67e Parents: 5527afb Author: Madhan Neethiraj <mad...@apache.org> Authored: Wed Jul 19 11:06:42 2017 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Wed Jul 19 17:42:30 2017 -0700 ---------------------------------------------------------------------- .../ClassificationSearchProcessor.java | 45 +++++----- .../atlas/discovery/EntitySearchProcessor.java | 62 +++++++------ .../discovery/FullTextSearchProcessor.java | 38 +++++--- .../apache/atlas/discovery/SearchProcessor.java | 91 ++++++++++++-------- .../store/graph/v1/AtlasGraphUtilsV1.java | 4 + 5 files changed, 145 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f74e43c2/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java index 77b2c7c..b6e0de5 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java @@ -58,7 +58,7 @@ public class ClassificationSearchProcessor extends SearchProcessor { if (useSolrSearch) { StringBuilder solrQuery = new StringBuilder(); - constructTypeTestQuery(solrQuery, typeAndSubTypes); + constructTypeTestQuery(solrQuery, classificationType, typeAndSubTypes); constructFilterQuery(solrQuery, classificationType, filterCriteria, solrAttributes); String solrQueryString = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")"); @@ -95,20 +95,26 @@ public class ClassificationSearchProcessor extends SearchProcessor { } try { - int qryOffset = (nextProcessor == null) ? context.getSearchParameters().getOffset() : 0; - int limit = context.getSearchParameters().getLimit(); - int resultIdx = qryOffset; - Set<String> processedGuids = new HashSet<>(); + final int startIdx = context.getSearchParameters().getOffset(); + final int limit = context.getSearchParameters().getLimit(); + int qryOffset = nextProcessor == null ? startIdx : 0; + int resultIdx = qryOffset; + + final Set<String> processedGuids = new HashSet<>(); + final List<AtlasVertex> entityVertices = new ArrayList<>(); + final List<AtlasVertex> classificationVertices = new ArrayList<>(); + + + for (; ret.size() < limit; qryOffset += limit) { + entityVertices.clear(); + classificationVertices.clear(); - while (ret.size() < limit) { if (context.terminateSearch()) { LOG.warn("query terminated: {}", context.getSearchParameters()); break; } - List<AtlasVertex> classificationVertices; - if (indexQuery != null) { Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit); @@ -116,7 +122,7 @@ public class ClassificationSearchProcessor extends SearchProcessor { break; } - classificationVertices = getVerticesFromIndexQueryResult(queryResult); + getVerticesFromIndexQueryResult(queryResult, classificationVertices); } else { Iterator<AtlasVertex> queryResult = allGraphQuery.vertices(qryOffset, limit).iterator(); @@ -124,13 +130,9 @@ public class ClassificationSearchProcessor extends SearchProcessor { break; } - classificationVertices = getVertices(queryResult); + getVertices(queryResult, classificationVertices); } - qryOffset += limit; - - List<AtlasVertex> entityVertices = new ArrayList<>(); - for (AtlasVertex classificationVertex : classificationVertices) { Iterable<AtlasEdge> edges = classificationVertex.getEdges(AtlasEdgeDirection.IN); @@ -148,12 +150,12 @@ public class ClassificationSearchProcessor extends SearchProcessor { } } - entityVertices = super.filter(entityVertices); + super.filter(entityVertices); for (AtlasVertex entityVertex : entityVertices) { resultIdx++; - if (resultIdx < context.getSearchParameters().getOffset()) { + if (resultIdx <= startIdx) { continue; } @@ -176,7 +178,7 @@ public class ClassificationSearchProcessor extends SearchProcessor { } @Override - public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) { + public void filter(List<AtlasVertex> entityVertices) { if (LOG.isDebugEnabled()) { LOG.debug("==> ClassificationSearchProcessor.filter({})", entityVertices.size()); } @@ -185,14 +187,13 @@ public class ClassificationSearchProcessor extends SearchProcessor { query.addConditionsFrom(filterGraphQuery); - List<AtlasVertex> ret = getVertices(query.vertices().iterator()); + entityVertices.clear(); + getVertices(query.vertices().iterator(), entityVertices); - ret = super.filter(ret); + super.filter(entityVertices); if (LOG.isDebugEnabled()) { - LOG.debug("<== ClassificationSearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size()); + LOG.debug("<== ClassificationSearchProcessor.filter(): ret.size()={}", entityVertices.size()); } - - return ret; } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f74e43c2/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java index 50376ef..6f629eb 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntitySearchProcessor.java @@ -20,6 +20,7 @@ package org.apache.atlas.discovery; import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.*; +import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.utils.AtlasPerfTracer; @@ -59,7 +60,7 @@ public class EntitySearchProcessor extends SearchProcessor { StringBuilder solrQuery = new StringBuilder(); if (typeSearchBySolr) { - constructTypeTestQuery(solrQuery, typeAndSubTypes); + constructTypeTestQuery(solrQuery, entityType, typeAndSubTypes); } if (attrSearchBySolr) { @@ -127,34 +128,48 @@ public class EntitySearchProcessor extends SearchProcessor { } try { - int qryOffset = (nextProcessor == null && (graphQuery == null || indexQuery == null)) ? context.getSearchParameters().getOffset() : 0; - int limit = context.getSearchParameters().getLimit(); - int resultIdx = qryOffset; + final int startIdx = context.getSearchParameters().getOffset(); + final int limit = context.getSearchParameters().getLimit(); + int qryOffset = (nextProcessor == null && (graphQuery == null || indexQuery == null)) ? startIdx : 0; + int resultIdx = qryOffset; + + final List<AtlasVertex> entityVertices = new ArrayList<>(); + + for (; ret.size() < limit; qryOffset += limit) { + entityVertices.clear(); - while (ret.size() < limit) { if (context.terminateSearch()) { LOG.warn("query terminated: {}", context.getSearchParameters()); break; } - List<AtlasVertex> vertices; - if (indexQuery != null) { - Iterator<AtlasIndexQuery.Result> queryResult = indexQuery.vertices(qryOffset, limit); + Iterator<AtlasIndexQuery.Result> idxQueryResult = indexQuery.vertices(qryOffset, limit); - if (!queryResult.hasNext()) { // no more results from solr - end of search + if (!idxQueryResult.hasNext()) { // no more results from solr - end of search break; } - vertices = getVerticesFromIndexQueryResult(queryResult); + while (idxQueryResult.hasNext()) { + AtlasVertex vertex = idxQueryResult.next().getVertex(); + + // skip non-entity vertices + if (!AtlasGraphUtilsV1.isEntityVertex(vertex)) { + LOG.warn("EntitySearchProcessor.execute(): ignoring non-entity vertex (id={})", vertex.getId()); // might cause duplicate entries in result + + continue; + } + + entityVertices.add(vertex); + } if (graphQuery != null) { - AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(vertices)); + AtlasGraphQuery guidQuery = context.getGraph().query().in(Constants.GUID_PROPERTY_KEY, getGuids(entityVertices)); guidQuery.addConditionsFrom(graphQuery); - vertices = getVertices(guidQuery.vertices().iterator()); + getVertices(guidQuery.vertices().iterator(), entityVertices); } } else { Iterator<AtlasVertex> queryResult = graphQuery.vertices(qryOffset, limit).iterator(); @@ -163,21 +178,19 @@ public class EntitySearchProcessor extends SearchProcessor { break; } - vertices = getVertices(queryResult); + getVertices(queryResult, entityVertices); } - qryOffset += limit; - - vertices = super.filter(vertices); + super.filter(entityVertices); - for (AtlasVertex vertex : vertices) { + for (AtlasVertex entityVertex : entityVertices) { resultIdx++; - if (resultIdx < context.getSearchParameters().getOffset()) { + if (resultIdx <= startIdx) { continue; } - ret.add(vertex); + ret.add(entityVertex); if (ret.size() == limit) { break; @@ -196,7 +209,7 @@ public class EntitySearchProcessor extends SearchProcessor { } @Override - public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) { + public void filter(List<AtlasVertex> entityVertices) { if (LOG.isDebugEnabled()) { LOG.debug("==> EntitySearchProcessor.filter({})", entityVertices.size()); } @@ -205,14 +218,13 @@ public class EntitySearchProcessor extends SearchProcessor { query.addConditionsFrom(filterGraphQuery); - List<AtlasVertex> ret = getVertices(query.vertices().iterator()); + entityVertices.clear(); + getVertices(query.vertices().iterator(), entityVertices); - ret = super.filter(ret); + super.filter(entityVertices); if (LOG.isDebugEnabled()) { - LOG.debug("<== EntitySearchProcessor.filter({}): ret.size()={}", entityVertices.size(), ret.size()); + LOG.debug("<== EntitySearchProcessor.filter(): ret.size()={}", entityVertices.size()); } - - return ret; } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f74e43c2/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java index 83368c2..22d91e0 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/FullTextSearchProcessor.java @@ -19,8 +19,10 @@ package org.apache.atlas.discovery; import org.apache.atlas.model.discovery.SearchParameters; import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasIndexQuery; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -68,7 +70,7 @@ public class FullTextSearchProcessor extends SearchProcessor { queryString.append(AND_STR).append("(").append(StringUtils.join(typeAndSubTypeNames, SPACE_STRING)).append(")"); } else { LOG.warn("'{}' has too many subtypes ({}) to include in index-query; might cause poor performance", - context.getEntityType().getTypeName(), typeAndSubTypeNames.size()); + context.getClassificationType().getTypeName(), typeAndSubTypeNames.size()); } } @@ -92,11 +94,16 @@ public class FullTextSearchProcessor extends SearchProcessor { } try { - int qryOffset = nextProcessor == null ? context.getSearchParameters().getOffset() : 0; - int limit = context.getSearchParameters().getLimit(); - int resultIdx = qryOffset; + final int startIdx = context.getSearchParameters().getOffset(); + final int limit = context.getSearchParameters().getLimit(); + int qryOffset = nextProcessor == null ? startIdx : 0; + int resultIdx = qryOffset; + + final List<AtlasVertex> entityVertices = new ArrayList<>(); + + for (; ret.size() < limit; qryOffset += limit) { + entityVertices.clear(); - while (ret.size() < limit) { if (context.terminateSearch()) { LOG.warn("query terminated: {}", context.getSearchParameters()); @@ -109,20 +116,29 @@ public class FullTextSearchProcessor extends SearchProcessor { break; } - qryOffset += limit; + while (idxQueryResult.hasNext()) { + AtlasVertex vertex = idxQueryResult.next().getVertex(); - List<AtlasVertex> vertices = getVerticesFromIndexQueryResult(idxQueryResult); + // skip non-entity vertices + if (!AtlasGraphUtilsV1.isEntityVertex(vertex)) { + LOG.warn("FullTextSearchProcessor.execute(): ignoring non-entity vertex (id={})", vertex.getId()); // might cause duplicate entries in result + + continue; + } + + entityVertices.add(vertex); + } - vertices = super.filter(vertices); + super.filter(entityVertices); - for (AtlasVertex vertex : vertices) { + for (AtlasVertex entityVertex : entityVertices) { resultIdx++; - if (resultIdx < context.getSearchParameters().getOffset()) { + if (resultIdx <= startIdx) { continue; } - ret.add(vertex); + ret.add(entityVertex); if (ret.size() == limit) { break; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f74e43c2/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java index 596b43b..2e75dfe 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/SearchProcessor.java @@ -50,6 +50,7 @@ public abstract class SearchProcessor { public static final String SPACE_STRING = " "; public static final String BRACE_OPEN_STR = "( "; public static final String BRACE_CLOSE_STR = " )"; + public static final char DOUBLE_QUOTE = '"'; private static final Map<SearchParameters.Operator, String> OPERATOR_MAP = new HashMap<>(); private static final char[] OFFENDING_CHARS = {'@', '/', ' '}; // This can grow as we discover corner cases @@ -87,8 +88,10 @@ public abstract class SearchProcessor { public abstract List<AtlasVertex> execute(); - public List<AtlasVertex> filter(List<AtlasVertex> entityVertices) { - return nextProcessor == null || CollectionUtils.isEmpty(entityVertices) ? entityVertices : nextProcessor.filter(entityVertices); + public void filter(List<AtlasVertex> entityVertices) { + if (nextProcessor != null && CollectionUtils.isNotEmpty(entityVertices)) { + nextProcessor.filter(entityVertices); + } } @@ -178,12 +181,26 @@ public abstract class SearchProcessor { return ret; } - protected void constructTypeTestQuery(StringBuilder solrQuery, Set<String> typeAndAllSubTypes) { + protected void constructTypeTestQuery(StringBuilder solrQuery, AtlasStructType type, Set<String> typeAndAllSubTypes) { String typeAndSubtypesString = StringUtils.join(typeAndAllSubTypes, SPACE_STRING); - solrQuery.append("v.\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\": (") - .append(typeAndSubtypesString) - .append(")"); + if (CollectionUtils.isNotEmpty(typeAndAllSubTypes)) { + if (solrQuery.length() > 0) { + solrQuery.append(AND_STR); + } + + solrQuery.append("v.\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\": (") + .append(typeAndSubtypesString) + .append(")"); + } + + if (type instanceof AtlasEntityType && context.getSearchParameters().getExcludeDeletedEntities()) { + if (solrQuery.length() > 0) { + solrQuery.append(AND_STR); + } + + solrQuery.append("v.\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE"); + } } protected void constructFilterQuery(StringBuilder solrQuery, AtlasStructType type, FilterCriteria filterCriteria, Set<String> solrAttributes) { @@ -200,14 +217,6 @@ public abstract class SearchProcessor { solrQuery.append(filterQuery); } } - - if (type instanceof AtlasEntityType && context.getSearchParameters().getExcludeDeletedEntities()) { - if (solrQuery.length() > 0) { - solrQuery.append(AND_STR); - } - - solrQuery.append("v.\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE"); - } } private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, Set<String> solrAttributes, int level) { @@ -246,15 +255,10 @@ public abstract class SearchProcessor { String ret = EMPTY_STRING; try { - String qualifiedName = type.getQualifiedAttributeName(attrName); - if (OPERATOR_MAP.get(op) != null) { - if (hasOffendingChars(attrVal)) { - // FIXME: if attrVal has offending chars & op is contains, endsWith, startsWith, solr doesn't like it and results are skewed - ret = String.format(OPERATOR_MAP.get(op), qualifiedName, "\"" + attrVal + "\""); - } else { - ret = String.format(OPERATOR_MAP.get(op), qualifiedName, attrVal); - } + String qualifiedName = type.getQualifiedAttributeName(attrName); + + ret = String.format(OPERATOR_MAP.get(op), qualifiedName, escapeIndexQueryValue(attrVal)); } } catch (AtlasBaseException ex) { LOG.warn(ex.getMessage()); @@ -348,32 +352,28 @@ public abstract class SearchProcessor { private String getLikeRegex(String attributeValue) { return ".*" + attributeValue + ".*"; } - protected List<AtlasVertex> getVerticesFromIndexQueryResult(Iterator<AtlasIndexQuery.Result> idxQueryResult) { - List<AtlasVertex> ret = new ArrayList<>(); - + protected List<AtlasVertex> getVerticesFromIndexQueryResult(Iterator<AtlasIndexQuery.Result> idxQueryResult, List<AtlasVertex> vertices) { if (idxQueryResult != null) { while (idxQueryResult.hasNext()) { AtlasVertex vertex = idxQueryResult.next().getVertex(); - ret.add(vertex); + vertices.add(vertex); } } - return ret; + return vertices; } - protected List<AtlasVertex> getVertices(Iterator<AtlasVertex> vertices) { - List<AtlasVertex> ret = new ArrayList<>(); - - if (vertices != null) { - while (vertices.hasNext()) { - AtlasVertex vertex = vertices.next(); + protected List<AtlasVertex> getVertices(Iterator<AtlasVertex> iterator, List<AtlasVertex> vertices) { + if (iterator != null) { + while (iterator.hasNext()) { + AtlasVertex vertex = iterator.next(); - ret.add(vertex); + vertices.add(vertex); } } - return ret; + return vertices; } protected Set<String> getGuids(List<AtlasVertex> vertices) { @@ -402,7 +402,24 @@ public abstract class SearchProcessor { return defaultValue; } - private boolean hasOffendingChars(String str) { - return StringUtils.containsAny(str, OFFENDING_CHARS); + private String escapeIndexQueryValue(String value) { + String ret = value; + + if (StringUtils.containsAny(value, OFFENDING_CHARS)) { + boolean isQuoteAtStart = value.charAt(0) == DOUBLE_QUOTE; + boolean isQuoteAtEnd = value.charAt(value.length() - 1) == DOUBLE_QUOTE; + + if (!isQuoteAtStart) { + if (!isQuoteAtEnd) { + ret = DOUBLE_QUOTE + value + DOUBLE_QUOTE; + } else { + ret = DOUBLE_QUOTE + value; + } + } else if (!isQuoteAtEnd) { + ret = value + DOUBLE_QUOTE; + } + } + + return ret; } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/f74e43c2/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java index cd9a47a..43f2c55 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java @@ -103,6 +103,10 @@ public class AtlasGraphUtilsV1 { } } + public static boolean isEntityVertex(AtlasVertex vertex) { + return StringUtils.isNotEmpty(getIdFromVertex(vertex)) && StringUtils.isNotEmpty(getTypeName(vertex)); + } + public static boolean isReference(AtlasType type) { return isReference(type.getTypeCategory()); }