This is an automated email from the ASF dual-hosted git repository. nixon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 1c034cbaaeeda50224b77fae63ca617221b49f6c Author: Pinal Shah <[email protected]> AuthorDate: Wed May 13 22:27:10 2020 +0530 ATLAS-3788 : BasicSearch: Classification with System attribute(indexed) filters has pagination issue Signed-off-by: nixonrodrigues <[email protected]> --- .../discovery/ClassificationSearchProcessor.java | 13 +- .../test/java/org/apache/atlas/BasicTestSetup.java | 55 ++--- .../discovery/BasicSearchClassificationTest.java | 241 +++++++++++++++++++++ 3 files changed, 272 insertions(+), 37 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java index 74c088c..5dd0d7f 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java +++ b/repository/src/main/java/org/apache/atlas/discovery/ClassificationSearchProcessor.java @@ -21,12 +21,7 @@ import org.apache.atlas.SortOrder; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graphdb.AtlasEdge; -import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.graphdb.AtlasGraphQuery; -import org.apache.atlas.repository.graphdb.AtlasIndexQuery; -import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.graphdb.*; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.util.AtlasGremlinQueryProvider; @@ -44,10 +39,6 @@ import javax.script.ScriptEngine; import javax.script.ScriptException; import java.util.*; -import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_CLASSIFIED; -import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_NOT_CLASSIFIED; -import static org.apache.atlas.discovery.SearchContext.MATCH_ALL_WILDCARD_CLASSIFICATION; - /** * This class is needed when this is a registered classification type or wildcard search, * registered classification includes special type as well. (tag filters will be ignored, and front-end should not enable @@ -293,6 +284,8 @@ public class ClassificationSearchProcessor extends SearchProcessor { getVerticesFromIndexQueryResult(queryResult, classificationVertices); + isLastResultPage = classificationVertices.size() < limit; + // Do in-memory filtering before the graph query CollectionUtils.filter(classificationVertices, inMemoryPredicate); } diff --git a/repository/src/test/java/org/apache/atlas/BasicTestSetup.java b/repository/src/test/java/org/apache/atlas/BasicTestSetup.java index 958781e..7508f78 100644 --- a/repository/src/test/java/org/apache/atlas/BasicTestSetup.java +++ b/repository/src/test/java/org/apache/atlas/BasicTestSetup.java @@ -18,8 +18,6 @@ package org.apache.atlas; import com.google.common.collect.ImmutableList; -import org.apache.atlas.AtlasClient; -import org.apache.atlas.TestUtilsV2; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; @@ -58,6 +56,7 @@ public abstract class BasicTestSetup { public static final String ETL_CLASSIFICATION = "ETL"; public static final String JDBC_CLASSIFICATION = "JdbcAccess"; public static final String LOGDATA_CLASSIFICATION = "Log Data"; + public static final String DIMENSIONAL_CLASSIFICATION = "Dimensional"; @Inject protected AtlasTypeRegistry typeRegistry; @@ -143,11 +142,11 @@ public abstract class BasicTestSetup { List<AtlasEntity> salesFactColumns = ImmutableList .of(column("time_id", "int", "time id"), column("product_id", "int", "product id"), - column("customer_id", "int", "customer id", "PII"), - column("sales", "double", "product id", "Metric")); + column("customer_id", "int", "customer id", PII_CLASSIFICATION), + column("sales", "double", "product id", METRIC_CLASSIFICATION)); entities.addAll(salesFactColumns); - AtlasEntity salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact"); + AtlasEntity salesFact = table("sales_fact", "sales fact table", salesDB, sd, "Joe", "Managed", salesFactColumns, FACT_CLASSIFICATION); salesFact.setAttribute("createTime", new Date(2018, 01, 01)); entities.add(salesFact); @@ -155,7 +154,7 @@ public abstract class BasicTestSetup { .of(column("time_id", "int", "time id"), column("app_id", "int", "app id"), column("machine_id", "int", "machine id"), - column("log", "string", "log data", "Log Data")); + column("log", "string", "log data", LOGDATA_CLASSIFICATION)); entities.addAll(logFactColumns); List<AtlasEntity> timeDimColumns = ImmutableList @@ -168,7 +167,7 @@ public abstract class BasicTestSetup { entities.add(sd); AtlasEntity timeDim = table("time_dim", "time dimension table", salesDB, sd, "John Doe", "External", timeDimColumns, - "Dimension"); + DIMENSION_CLASSIFICATION); entities.add(timeDim); AtlasEntity reportingDB = @@ -180,32 +179,32 @@ public abstract class BasicTestSetup { AtlasEntity salesFactDaily = table("sales_fact_daily_mv", "sales fact daily materialized view", reportingDB, sd, "Joe BI", "Managed", - salesFactColumns, "Metric"); + salesFactColumns, METRIC_CLASSIFICATION); salesFactDaily.setAttribute("createTime", Date.from(LocalDate.of(2016, 8, 19).atStartOfDay(ZoneId.systemDefault()).toInstant())); entities.add(salesFactDaily); sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id"))); entities.add(sd); - AtlasEntity circularLineageTable1 = table("table1", "", reportingDB, sd, "Vimal", "Managed", salesFactColumns, "Metric"); + AtlasEntity circularLineageTable1 = table("table1", "", reportingDB, sd, "Vimal", "Managed", salesFactColumns, METRIC_CLASSIFICATION); entities.add(circularLineageTable1); sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id"))); entities.add(sd); - AtlasEntity circularLineageTable2 = table("table2", "", reportingDB, sd, "Vimal 2", "Managed", salesFactColumns, "Metric"); + AtlasEntity circularLineageTable2 = table("table2", "", reportingDB, sd, "Vimal 2", "Managed", salesFactColumns, METRIC_CLASSIFICATION); entities.add(circularLineageTable2); AtlasEntity circularLineage1Process = loadProcess("circularLineage1", "hive query for daily summary", "John ETL", ImmutableList.of(circularLineageTable1), - ImmutableList.of(circularLineageTable2), "create table as select ", "plan", "id", "graph", "ETL"); + ImmutableList.of(circularLineageTable2), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION); entities.add(circularLineage1Process); AtlasEntity circularLineage2Process = loadProcess("circularLineage2", "hive query for daily summary", "John ETL", ImmutableList.of(circularLineageTable2), - ImmutableList.of(circularLineageTable1), "create table as select ", "plan", "id", "graph", "ETL"); + ImmutableList.of(circularLineageTable1), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION); entities.add(circularLineage2Process); AtlasEntity loadSalesDaily = loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", ImmutableList.of(salesFact, timeDim), - ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", "ETL"); + ImmutableList.of(salesFactDaily), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION); entities.add(loadSalesDaily); AtlasEntity logDB = database("Logging", "logging database", "Tim ETL", "hdfs://host:8000/apps/warehouse/logging"); @@ -216,7 +215,7 @@ public abstract class BasicTestSetup { AtlasEntity loggingFactDaily = table("log_fact_daily_mv", "log fact daily materialized view", logDB, sd, "Tim ETL", "Managed", - logFactColumns, "Log Data"); + logFactColumns, LOGDATA_CLASSIFICATION); entities.add(loggingFactDaily); List<AtlasEntity> productDimColumns = ImmutableList @@ -230,27 +229,27 @@ public abstract class BasicTestSetup { AtlasEntity productDim = table("product_dim", "product dimension table", salesDB, sd, "John Doe 2", "Managed", productDimColumns, - "Dimension"); + DIMENSION_CLASSIFICATION); entities.add(productDim); - AtlasEntity productDimView = view("product_dim_view", reportingDB, ImmutableList.of(productDim), "Dimension", "JdbcAccess"); + AtlasEntity productDimView = view("product_dim_view", reportingDB, ImmutableList.of(productDim), DIMENSION_CLASSIFICATION, JDBC_CLASSIFICATION); entities.add(productDimView); List<AtlasEntity> customerDimColumns = ImmutableList.of( - column("customer_id", "int", "customer id", "PII"), - column("name", "string", "customer name", "PII"), - column("address", "string", "customer address", "PII")); + column("customer_id", "int", "customer id", PII_CLASSIFICATION), + column("name", "string", "customer name", PII_CLASSIFICATION), + column("address", "string", "customer address", PII_CLASSIFICATION)); entities.addAll(customerDimColumns); sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id"))); entities.add(sd); AtlasEntity customerDim = - table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns, - "Dimension"); + table("customer_dim", "customer dimension table", salesDB, sd, "fetl", "External", customerDimColumns, + DIMENSION_CLASSIFICATION); entities.add(customerDim); - AtlasEntity customerDimView = view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), "Dimension", "JdbcAccess"); + AtlasEntity customerDimView = view("customer_dim_view", reportingDB, ImmutableList.of(customerDim), DIMENSION_CLASSIFICATION, JDBC_CLASSIFICATION); entities.add(customerDimView); sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id"))); @@ -258,11 +257,11 @@ public abstract class BasicTestSetup { AtlasEntity salesFactMonthly = table("sales_fact_monthly_mv", "sales fact monthly materialized view", reportingDB, sd, "Jane BI", - "Managed", salesFactColumns, "Metric"); + "Managed", salesFactColumns, METRIC_CLASSIFICATION); entities.add(salesFactMonthly); AtlasEntity loadSalesMonthly = loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", ImmutableList.of(salesFactDaily), - ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", "ETL"); + ImmutableList.of(salesFactMonthly), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION); entities.add(loadSalesMonthly); sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", "TextInputFormat", "TextOutputFormat", true, ImmutableList.of(column("time_id", "int", "time id"))); @@ -270,11 +269,11 @@ public abstract class BasicTestSetup { AtlasEntity loggingFactMonthly = table("logging_fact_monthly_mv", "logging fact monthly materialized view", logDB, sd, "Tim ETL 2", - "Managed", logFactColumns, "Log Data"); + "Managed", logFactColumns, LOGDATA_CLASSIFICATION); entities.add(loggingFactMonthly); AtlasEntity loadLogsMonthly = loadProcess("loadLogsMonthly", "hive query for monthly summary", "Tim ETL", ImmutableList.of(loggingFactDaily), - ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", "ETL"); + ImmutableList.of(loggingFactMonthly), "create table as select ", "plan", "id", "graph", ETL_CLASSIFICATION); entities.add(loadLogsMonthly); AtlasEntity datasetSubType = datasetSubType("dataSetSubTypeInst1", "testOwner"); @@ -290,7 +289,9 @@ public abstract class BasicTestSetup { new AtlasClassificationDef(METRIC_CLASSIFICATION, "Metric Classification", "1.0"), new AtlasClassificationDef(ETL_CLASSIFICATION, "ETL Classification", "1.0"), new AtlasClassificationDef(JDBC_CLASSIFICATION, "JdbcAccess Classification", "1.0"), - new AtlasClassificationDef(LOGDATA_CLASSIFICATION, "LogData Classification", "1.0")); + new AtlasClassificationDef(LOGDATA_CLASSIFICATION, "LogData Classification", "1.0"), + new AtlasClassificationDef(DIMENSIONAL_CLASSIFICATION,"Dimensional Classification", "1.0" , + Arrays.asList(new AtlasStructDef.AtlasAttributeDef("attr1","string")))); AtlasTypesDef tds = new AtlasTypesDef(Collections.<AtlasEnumDef>emptyList(), Collections.<AtlasStructDef>emptyList(), diff --git a/repository/src/test/java/org/apache/atlas/discovery/BasicSearchClassificationTest.java b/repository/src/test/java/org/apache/atlas/discovery/BasicSearchClassificationTest.java new file mode 100644 index 0000000..95f782a --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/discovery/BasicSearchClassificationTest.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.discovery; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.BasicTestSetup; +import org.apache.atlas.TestModules; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; +import org.apache.commons.collections.CollectionUtils; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.apache.atlas.model.discovery.SearchParameters.*; +import static org.testng.Assert.assertEquals; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class BasicSearchClassificationTest extends BasicTestSetup { + + @Inject + private AtlasDiscoveryService discoveryService; + + private int totalEntities = 0; + private int totalClassifiedEntities = 0; + private int getTotalClassifiedEntitiesHistorical = 0; + private int dimensionTagEntities = 10; + private String dimensionTagDeleteGuid; + private String dimensionalTagGuid; + + @BeforeClass + public void setup() throws AtlasBaseException { + setupTestData(); + createDimensionTaggedEntityAndDelete(); + createDimensionalTaggedEntityWithAttr(); + } + + @Test(priority = -1) + public void searchByALLTag() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification(ALL_CLASSIFICATION_TYPES); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + + Assert.assertTrue(CollectionUtils.isNotEmpty(entityHeaders)); + + totalEntities = getEntityCount(); + totalClassifiedEntities = entityHeaders.size(); + getTotalClassifiedEntitiesHistorical = getEntityWithTagCountHistorical(); + } + + @Test + public void searchByALLTagAndIndexSysFilters() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification(ALL_CLASSIFICATION_TYPES); + FilterCriteria filterCriteria = getSingleFilterCondition("__timestamp", Operator.LT, String.valueOf(System.currentTimeMillis())); + params.setTagFilters(filterCriteria); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + + assertEquals(entityHeaders.size(), totalClassifiedEntities); + } + + @Test + public void searchByALLTagAndIndexSysFiltersToTestLimit() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification(ALL_CLASSIFICATION_TYPES); + FilterCriteria filterCriteria = getSingleFilterCondition("__timestamp", Operator.LT, String.valueOf(System.currentTimeMillis())); + params.setTagFilters(filterCriteria); + params.setLimit(totalClassifiedEntities - 2); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + + assertEquals(entityHeaders.size(), totalClassifiedEntities - 2); + } + + @Test + public void searchByNOTCLASSIFIED() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification(NO_CLASSIFICATIONS); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + + assertEquals(entityHeaders.size(), totalEntities - totalClassifiedEntities); + } + + @Test + public void searchByTag() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification(DIMENSION_CLASSIFICATION); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + + assertEquals(entityHeaders.size(), dimensionTagEntities); + } + + @Test + public void searchByTagAndTagFilters() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification(DIMENSIONAL_CLASSIFICATION); + FilterCriteria filterCriteria = getSingleFilterCondition("attr1", Operator.EQ, "Test"); + params.setTagFilters(filterCriteria); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + + assertEquals(entityHeaders.size(), 1); + assertEquals(entityHeaders.get(0).getGuid(), dimensionalTagGuid); + + } + + @Test + public void searchByTagAndIndexSysFilters() throws AtlasBaseException { + + SearchParameters params = new SearchParameters(); + params.setClassification(DIMENSION_CLASSIFICATION); + FilterCriteria filterCriteria = getSingleFilterCondition("__timestamp", Operator.LT, String.valueOf(System.currentTimeMillis())); + params.setTagFilters(filterCriteria); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + + assertEquals(entityHeaders.size(), dimensionTagEntities); + } + + @Test + public void searchByWildcardTag() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification("Dimension*"); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + + assertEquals(entityHeaders.size(), dimensionTagEntities + 1); + + } + + //@Test + public void searchByTagAndGraphSysFilters() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification(DIMENSION_CLASSIFICATION); + FilterCriteria filterCriteria = getSingleFilterCondition("__entityStatus", Operator.EQ, "DELETED"); + params.setTagFilters(filterCriteria); + params.setExcludeDeletedEntities(false); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + + assertEquals(entityHeaders.size(), 1); + assertEquals(entityHeaders.get(0).getGuid(), dimensionTagDeleteGuid); + + } + + private void createDimensionTaggedEntityAndDelete() throws AtlasBaseException { + AtlasEntity entityToDelete = new AtlasEntity(HIVE_TABLE_TYPE); + entityToDelete.setAttribute("name", "entity to be deleted"); + entityToDelete.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "entity.tobedeleted"); + + List<AtlasClassification> cls = new ArrayList<>(); + cls.add(new AtlasClassification(DIMENSION_CLASSIFICATION)); + entityToDelete.setClassifications(cls); + + //create entity + EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(new AtlasEntity.AtlasEntitiesWithExtInfo(entityToDelete)), false); + AtlasEntityHeader entityHeader = response.getCreatedEntities().get(0); + dimensionTagDeleteGuid = entityHeader.getGuid(); + + //delete entity + entityStore.deleteById(dimensionTagDeleteGuid); + } + + private void createDimensionalTaggedEntityWithAttr() throws AtlasBaseException { + AtlasEntity entityToDelete = new AtlasEntity(HIVE_TABLE_TYPE); + entityToDelete.setAttribute("name", "Entity1"); + entityToDelete.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "entity.one"); + + List<AtlasClassification> cls = new ArrayList<>(); + cls.add(new AtlasClassification(DIMENSIONAL_CLASSIFICATION, new HashMap<String, Object>() {{ + put("attr1", "Test"); + }})); + entityToDelete.setClassifications(cls); + + //create entity + final EntityMutationResponse response = entityStore.createOrUpdate(new AtlasEntityStream(new AtlasEntity.AtlasEntitiesWithExtInfo(entityToDelete)), false); + AtlasEntityHeader entityHeader = response.getCreatedEntities().get(0); + dimensionalTagGuid = entityHeader.getGuid(); + + } + + private int getEntityCount() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setTypeName(ALL_ENTITY_TYPES); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + return entityHeaders.size(); + } + + private int getEntityWithTagCountHistorical() throws AtlasBaseException { + SearchParameters params = new SearchParameters(); + params.setClassification(ALL_CLASSIFICATION_TYPES); + params.setExcludeDeletedEntities(false); + + List<AtlasEntityHeader> entityHeaders = discoveryService.searchWithParameters(params).getEntities(); + return entityHeaders.size(); + } + private FilterCriteria getSingleFilterCondition(String attName, Operator op, String attrValue) { + FilterCriteria filterCriteria = new FilterCriteria(); + filterCriteria.setCondition(FilterCriteria.Condition.AND); + List<FilterCriteria> criteria = new ArrayList<>(); + FilterCriteria f1 = new FilterCriteria(); + f1.setAttributeName(attName); + f1.setOperator(op); + String time = String.valueOf(System.currentTimeMillis()); + f1.setAttributeValue(attrValue); + criteria.add(f1); + filterCriteria.setCriterion(criteria); + return filterCriteria; + } +}
