http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java index d7c66d3..ff1751d 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/MetadataDiscoveryResource.java @@ -22,8 +22,6 @@ import com.google.common.base.Preconditions; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.classification.InterfaceAudience; -import org.apache.atlas.discovery.DiscoveryException; -import org.apache.atlas.discovery.DiscoveryService; import org.apache.atlas.query.QueryParams; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.utils.ParamChecker; @@ -46,6 +44,7 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -65,8 +64,6 @@ public class MetadataDiscoveryResource { private static final String QUERY_TYPE_FULLTEXT = "full-text"; private static final String LIMIT_OFFSET_DEFAULT = "-1"; - private final DiscoveryService discoveryService; - private final boolean gremlinSearchEnabled; private static Configuration applicationProperties = null; private static final String ENABLE_GREMLIN_SEARCH_PROPERTY = "atlas.search.gremlin.enable"; @@ -75,11 +72,10 @@ public class MetadataDiscoveryResource { * Created by the Guice ServletModule and injected with the * configured DiscoveryService. * - * @param discoveryService metadata service handle + * @param configuration configuration */ @Inject - public MetadataDiscoveryResource(DiscoveryService discoveryService, Configuration configuration) { - this.discoveryService = discoveryService; + public MetadataDiscoveryResource(Configuration configuration) { applicationProperties = configuration; gremlinSearchEnabled = applicationProperties != null && applicationProperties.getBoolean(ENABLE_GREMLIN_SEARCH_PROPERTY, false); } @@ -152,12 +148,12 @@ public class MetadataDiscoveryResource { dslQuery = ParamChecker.notEmpty(dslQuery, "dslQuery cannot be null"); QueryParams queryParams = validateQueryParams(limit, offset); - final String jsonResultStr = discoveryService.searchByDSL(dslQuery, queryParams); + final String jsonResultStr = ""; // TODO-typeSystem-removal: discoveryService.searchByDSL(dslQuery, queryParams); JSONObject response = new DSLJSONResponseBuilder().results(jsonResultStr).query(dslQuery).build(); return Response.ok(response).build(); - } catch (DiscoveryException | IllegalArgumentException e) { + } catch (IllegalArgumentException e) { LOG.error("Unable to get entity list for dslQuery {}", dslQuery, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) { @@ -220,11 +216,11 @@ public class MetadataDiscoveryResource { } if (!gremlinSearchEnabled) { - throw new DiscoveryException("Gremlin search is not enabled."); + throw new Exception("Gremlin search is not enabled."); } gremlinQuery = ParamChecker.notEmpty(gremlinQuery, "gremlinQuery cannot be null or empty"); - final List<Map<String, String>> results = discoveryService.searchByGremlin(gremlinQuery); + final List<Map<String, String>> results = new ArrayList<>(); // TODO-typeSystem-removal: discoveryService.searchByGremlin(gremlinQuery); JSONObject response = new JSONObject(); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); @@ -239,7 +235,7 @@ public class MetadataDiscoveryResource { response.put(AtlasClient.COUNT, list.length()); return Response.ok(response).build(); - } catch (DiscoveryException | IllegalArgumentException e) { + } catch (IllegalArgumentException e) { LOG.error("Unable to get entity list for gremlinQuery {}", gremlinQuery, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) { @@ -284,12 +280,12 @@ public class MetadataDiscoveryResource { query = ParamChecker.notEmpty(query, "query cannot be null or empty"); QueryParams queryParams = validateQueryParams(limit, offset); - final String jsonResultStr = discoveryService.searchByFullText(query, queryParams); + final String jsonResultStr = ""; // TODO-typeSystem-removal: discoveryService.searchByFullText(query, queryParams); JSONArray rowsJsonArr = new JSONArray(jsonResultStr); JSONObject response = new FullTextJSonResponseBuilder().results(rowsJsonArr).query(query).build(); return Response.ok(response).build(); - } catch (DiscoveryException | IllegalArgumentException e) { + } catch (IllegalArgumentException e) { LOG.error("Unable to get entity list for query {}", query, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) {
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java index a9c5509..9b2d7b2 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/TypesResource.java @@ -23,10 +23,10 @@ import com.sun.jersey.api.core.ResourceContext; import org.apache.atlas.AtlasClient; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.typedef.AtlasTypesDef; +import org.apache.atlas.v1.model.typedef.TypesDef; import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.TypesDef; -import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.atlas.repository.converters.TypeConverterUtil; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.web.rest.TypesREST; @@ -237,7 +237,7 @@ public class TypesResource { try { TypesDef typesDef = TypeConverterUtil.toTypesDef(typeRegistry.getType(typeName), typeRegistry);; - String typeDefinition = TypesSerialization.toJson(typesDef); + String typeDefinition = AtlasType.toV1Json(typesDef); response.put(AtlasClient.TYPENAME, typeName); response.put(AtlasClient.DEFINITION, new JSONObject(typeDefinition)); http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java index c6b4a6f..ea8b738 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceState.java @@ -33,10 +33,10 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; -import scala.actors.threadpool.Arrays; import javax.inject.Inject; import java.nio.charset.Charset; +import java.util.Arrays; import java.util.List; /** http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java index ad2a697..562d9b7 100644 --- a/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java +++ b/webapp/src/main/java/org/apache/atlas/web/util/LineageUtils.java @@ -23,13 +23,11 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.typedef.AtlasBaseTypeDef; -import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.repository.Constants; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.v1.model.instance.Struct; import java.util.ArrayList; import java.util.HashMap; @@ -38,21 +36,19 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_PREFIX; -import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_ARRAY_SUFFIX; public final class LineageUtils { private LineageUtils() {} - private static final String VERTICES_ATTR_NAME = "vertices"; - private static final String EDGES_ATTR_NAME = "edges"; private static final String VERTEX_ID_ATTR_NAME = "vertexId"; private static final String TEMP_STRUCT_ID_RESULT = "__IdType"; private static final AtomicInteger COUNTER = new AtomicInteger(); - public static String toLineageStruct(AtlasLineageInfo lineageInfo, AtlasTypeRegistry registry) throws AtlasBaseException { - String ret = null; + public static Struct toLineageStruct(AtlasLineageInfo lineageInfo, AtlasTypeRegistry registry) throws AtlasBaseException { + Struct ret = new Struct(); + + ret.setTypeName(Constants.TEMP_STRUCT_NAME_PREFIX + COUNTER.getAndIncrement()); if (lineageInfo != null) { Map<String, AtlasEntityHeader> entities = lineageInfo.getGuidEntityMap(); @@ -66,11 +62,10 @@ public final class LineageUtils { if (isDataSet(entityHeader.getTypeName(), registry)) { Map<String, Object> vertexIdMap = new HashMap<>(); - TypeSystem.IdType idType = TypeSystem.getInstance().getIdType(); - vertexIdMap.put(idType.idAttrName(), guid); - vertexIdMap.put(idType.stateAttrName(), (entityHeader.getStatus() == AtlasEntity.Status.ACTIVE) ? "ACTIVE" : "DELETED"); - vertexIdMap.put(idType.typeNameAttrName(), entityHeader.getTypeName()); + vertexIdMap.put(Constants.ATTRIBUTE_NAME_GUID, guid); + vertexIdMap.put(Constants.ATTRIBUTE_NAME_STATE, (entityHeader.getStatus() == AtlasEntity.Status.ACTIVE) ? "ACTIVE" : "DELETED"); + vertexIdMap.put(Constants.ATTRIBUTE_NAME_TYPENAME, entityHeader.getTypeName()); Object qualifiedName = entityHeader.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); if (qualifiedName == null) { @@ -106,11 +101,8 @@ public final class LineageUtils { } } - Map<String, Object> map = new HashMap<>(); - map.put(VERTICES_ATTR_NAME, verticesMap); - map.put(EDGES_ATTR_NAME, edgesMap); - - ret = InstanceSerialization.toJson(constructResultStruct(map, false), false); + ret.set("vertices", verticesMap); + ret.set("edges", edgesMap); } return ret; @@ -121,7 +113,7 @@ public final class LineageUtils { return new Struct(TEMP_STRUCT_ID_RESULT, values); } - return new Struct(org.apache.atlas.query.TypeUtils.TEMP_STRUCT_NAME_PREFIX() + COUNTER.getAndIncrement(), values); + return new Struct(Constants.TEMP_STRUCT_NAME_PREFIX + COUNTER.getAndIncrement(), values); } private static boolean isDataSet(String typeName, AtlasTypeRegistry registry) throws AtlasBaseException { http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java b/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java index 592c2a6..1b5e811 100644 --- a/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java +++ b/webapp/src/test/java/org/apache/atlas/examples/QuickStartIT.java @@ -20,8 +20,8 @@ package org.apache.atlas.examples; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.v1.model.instance.Id; +import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.web.integration.BaseResourceIT; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java b/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java index 3b4ba02..e65d678 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java @@ -26,7 +26,7 @@ public class AdaptiveWaiterTest { private final int maxDuration = 100; private final int minDuration = 5; - private final int increment = 5; + private final int increment = 5; private NotificationHookConsumer.AdaptiveWaiter waiter; @BeforeClass @@ -36,11 +36,13 @@ public class AdaptiveWaiterTest { @Test public void basicTest() { - for (int i = 0; i < 20; i++) { + int pauseCount = 10; + + for (int i = 0; i < pauseCount; i++) { waiter.pause(new IllegalStateException()); } - assertEquals(waiter.waitDuration, 95); + assertEquals(waiter.waitDuration, Math.min((pauseCount + 1) * minDuration, maxDuration)); // waiter.waitDuration will be set to wait time for next pause() } @Test @@ -63,6 +65,6 @@ public class AdaptiveWaiterTest { } waiter.pause(new IllegalArgumentException()); - assertEquals(waiter.waitDuration, 5); + assertEquals(waiter.waitDuration, minDuration); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java index 7e94330..486b30b 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java @@ -18,25 +18,21 @@ package org.apache.atlas.notification; -import com.google.common.collect.ImmutableSet; import org.apache.atlas.AtlasClient; import org.apache.atlas.kafka.NotificationProvider; -import org.apache.atlas.notification.entity.EntityNotification; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.apache.atlas.typesystem.json.TypesSerialization$; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; -import org.apache.atlas.typesystem.types.TraitType; -import org.apache.atlas.typesystem.types.utils.TypesUtil; +import org.apache.atlas.notification.NotificationInterface.NotificationType; +import org.apache.atlas.v1.model.instance.Id; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; +import org.apache.atlas.v1.model.notification.EntityNotificationV1; +import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType; +import org.apache.atlas.v1.model.typedef.*; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; import org.apache.atlas.web.integration.BaseResourceIT; import org.testng.annotations.BeforeClass; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; + +import java.util.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -46,33 +42,35 @@ import static org.testng.Assert.assertTrue; * Entity Notification Integration Tests. */ public class EntityNotificationIT extends BaseResourceIT { - - private final String DATABASE_NAME = "db" + randomString(); - private final String TABLE_NAME = "table" + randomString(); - private NotificationInterface notificationInterface = NotificationProvider.get(); - private Id tableId; - private Id dbId; - private String traitName; - private NotificationConsumer notificationConsumer; + private final String DATABASE_NAME = "db" + randomString(); + private final String TABLE_NAME = "table" + randomString(); + private final NotificationInterface notificationInterface = NotificationProvider.get(); + private Id tableId; + private Id dbId; + private String traitName; + private NotificationConsumer notificationConsumer; @BeforeClass public void setUp() throws Exception { super.setUp(); + createTypeDefinitionsV1(); + Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME); + dbId = createInstance(HiveDBInstance); - notificationConsumer = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0); + notificationConsumer = notificationInterface.createConsumers(NotificationType.ENTITIES, 1).get(0); } public void testCreateEntity() throws Exception { Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId); + tableId = createInstance(tableInstance); final String guid = tableId._getId(); - waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); } public void testUpdateEntity() throws Exception { @@ -83,83 +81,83 @@ public class EntityNotificationIT extends BaseResourceIT { atlasClientV1.updateEntityAttribute(guid, property, newValue); - waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid)); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid)); } public void testDeleteEntity() throws Exception { - final String tableName = "table-" + randomString(); - final String dbName = "db-" + randomString(); - Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName); - Id dbId = createInstance(HiveDBInstance); + final String tableName = "table-" + randomString(); + final String dbName = "db-" + randomString(); + final Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName); + final Id dbId = createInstance(HiveDBInstance); + final Referenceable tableInstance = createHiveTableInstanceBuiltIn(dbName, tableName, dbId); + final Id tableId = createInstance(tableInstance); + final String guid = tableId._getId(); - Referenceable tableInstance = createHiveTableInstanceBuiltIn(dbName, tableName, dbId); - final Id tableId = createInstance(tableInstance); - final String guid = tableId._getId(); - - waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); final String name = (String) tableInstance.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); atlasClientV1.deleteEntity(HIVE_TABLE_TYPE_BUILTIN, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); - waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); } public void testAddTrait() throws Exception { String superSuperTraitName = "SuperTrait" + randomString(); - createTrait(superSuperTraitName); - - String superTraitName = "SuperTrait" + randomString(); - createTrait(superTraitName, superSuperTraitName); + String superTraitName = "SuperTrait" + randomString(); traitName = "Trait" + randomString(); + + createTrait(superSuperTraitName); + createTrait(superTraitName, superSuperTraitName); createTrait(traitName, superTraitName); - Struct traitInstance = new Struct(traitName); - String traitInstanceJSON = InstanceSerialization.toJson(traitInstance, true); + Struct traitInstance = new Struct(traitName); + String traitInstanceJSON = AtlasType.toV1Json(traitInstance); + LOG.debug("Trait instance = {}", traitInstanceJSON); final String guid = tableId._getId(); atlasClientV1.addTrait(guid, traitInstance); - EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); + EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); + + Referenceable entity = entityNotification.getEntity(); - IReferenceableInstance entity = entityNotification.getEntity(); - assertTrue(entity.getTraits().contains(traitName)); + assertTrue(entity.getTraitNames().contains(traitName)); - List<IStruct> allTraits = entityNotification.getAllTraits(); + List<Struct> allTraits = entityNotification.getAllTraits(); List<String> allTraitNames = new LinkedList<>(); - for (IStruct struct : allTraits) { + for (Struct struct : allTraits) { allTraitNames.add(struct.getTypeName()); } + assertTrue(allTraitNames.contains(traitName)); assertTrue(allTraitNames.contains(superTraitName)); assertTrue(allTraitNames.contains(superSuperTraitName)); String anotherTraitName = "Trait" + randomString(); + createTrait(anotherTraitName, superTraitName); - traitInstance = new Struct(anotherTraitName); - traitInstanceJSON = InstanceSerialization.toJson(traitInstance, true); + traitInstance = new Struct(anotherTraitName); + traitInstanceJSON = AtlasType.toV1Json(traitInstance); + LOG.debug("Trait instance = {}", traitInstanceJSON); atlasClientV1.addTrait(guid, traitInstance); - entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); + entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); - allTraits = entityNotification.getAllTraits(); + allTraits = entityNotification.getAllTraits(); allTraitNames = new LinkedList<>(); - for (IStruct struct : allTraits) { + for (Struct struct : allTraits) { allTraitNames.add(struct.getTypeName()); } + assertTrue(allTraitNames.contains(traitName)); assertTrue(allTraitNames.contains(anotherTraitName)); // verify that the super type shows up twice in all traits @@ -171,21 +169,25 @@ public class EntityNotificationIT extends BaseResourceIT { atlasClientV1.deleteTrait(guid, traitName); - EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); + EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, + newNotificationPredicate(EntityNotificationV1.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); - assertFalse(entityNotification.getEntity().getTraits().contains(traitName)); + assertFalse(entityNotification.getEntity().getTraitNames().contains(traitName)); } // ----- helper methods --------------------------------------------------- private void createTrait(String traitName, String ... superTraitNames) throws Exception { - HierarchicalTypeDefinition<TraitType> trait = - TypesUtil.createTraitTypeDef(traitName, ImmutableSet.copyOf(superTraitNames)); + TraitTypeDefinition traitDef = TypesUtil.createTraitTypeDef(traitName, null, new HashSet<>(Arrays.asList(superTraitNames))); + TypesDef typesDef = new TypesDef(Collections.<EnumTypeDefinition>emptyList(), + Collections.<StructTypeDefinition>emptyList(), + Collections.singletonList(traitDef), + Collections.<ClassTypeDefinition>emptyList()); + String traitDefinitionJSON = AtlasType.toV1Json(typesDef); - String traitDefinitionJSON = TypesSerialization$.MODULE$.toJson(trait, true); LOG.debug("Trait definition = {}", traitDefinitionJSON); + createType(traitDefinitionJSON); } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java index a988915..084ebb1 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationEntityChangeListenerTest.java @@ -18,11 +18,10 @@ package org.apache.atlas.notification; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.types.TraitType; -import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.testng.annotations.Test; import java.util.Collections; @@ -41,45 +40,45 @@ public class NotificationEntityChangeListenerTest { @Test public void testGetAllTraitsSuperTraits() throws Exception { - TypeSystem typeSystem = mock(TypeSystem.class); + AtlasTypeRegistry typeSystem = mock(AtlasTypeRegistry.class); String traitName = "MyTrait"; - IStruct myTrait = new Struct(traitName); + Struct myTrait = new Struct(traitName); String superTraitName = "MySuperTrait"; - TraitType traitDef = mock(TraitType.class); + AtlasClassificationType traitDef = mock(AtlasClassificationType.class); Set<String> superTypeNames = Collections.singleton(superTraitName); - TraitType superTraitDef = mock(TraitType.class); + AtlasClassificationType superTraitDef = mock(AtlasClassificationType.class); Set<String> superSuperTypeNames = Collections.emptySet(); Referenceable entity = getEntity("id", myTrait); - when(typeSystem.getDataType(TraitType.class, traitName)).thenReturn(traitDef); - when(typeSystem.getDataType(TraitType.class, superTraitName)).thenReturn(superTraitDef); + when(typeSystem.getClassificationTypeByName(traitName)).thenReturn(traitDef); + when(typeSystem.getClassificationTypeByName(superTraitName)).thenReturn(superTraitDef); - when(traitDef.getAllSuperTypeNames()).thenReturn(superTypeNames); - when(superTraitDef.getAllSuperTypeNames()).thenReturn(superSuperTypeNames); + when(traitDef.getAllSuperTypes()).thenReturn(superTypeNames); + when(superTraitDef.getAllSuperTypes()).thenReturn(superSuperTypeNames); - List<IStruct> allTraits = NotificationEntityChangeListener.getAllTraits(entity, typeSystem); + List<Struct> allTraits = NotificationEntityChangeListener.getAllTraits(entity, typeSystem); assertEquals(2, allTraits.size()); - for (IStruct trait : allTraits) { + for (Struct trait : allTraits) { String typeName = trait.getTypeName(); assertTrue(typeName.equals(traitName) || typeName.equals(superTraitName)); } } - private Referenceable getEntity(String id, IStruct... traits) { + private Referenceable getEntity(String id, Struct... traits) { String typeName = "typeName"; Map<String, Object> values = new HashMap<>(); List<String> traitNames = new LinkedList<>(); - Map<String, IStruct> traitMap = new HashMap<>(); + Map<String, Struct> traitMap = new HashMap<>(); - for (IStruct trait : traits) { + for (Struct trait : traits) { String traitName = trait.getTypeName(); traitNames.add(traitName); http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java index d41db3e..f248593 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java @@ -20,14 +20,13 @@ package org.apache.atlas.notification; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.kafka.NotificationProvider; -import org.apache.atlas.notification.hook.HookNotification; -import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; -import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest; -import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; -import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest; -import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.v1.model.instance.Id; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest; import org.apache.atlas.web.integration.BaseResourceIT; import org.codehaus.jettison.json.JSONArray; import org.testng.annotations.AfterClass; @@ -40,18 +39,19 @@ import static java.lang.Thread.sleep; import static org.testng.Assert.assertEquals; public class NotificationHookConsumerIT extends BaseResourceIT { - private static final String TEST_USER = "testuser"; - public static final String NAME = "name"; - public static final String DESCRIPTION = "description"; + + public static final String NAME = "name"; + public static final String DESCRIPTION = "description"; public static final String QUALIFIED_NAME = "qualifiedName"; - public static final String CLUSTER_NAME = "clusterName"; + public static final String CLUSTER_NAME = "clusterName"; - private NotificationInterface notificationInterface = NotificationProvider.get(); + private final NotificationInterface notificationInterface = NotificationProvider.get(); @BeforeClass public void setUp() throws Exception { super.setUp(); + createTypeDefinitionsV1(); } @@ -60,29 +60,33 @@ public class NotificationHookConsumerIT extends BaseResourceIT { notificationInterface.close(); } - private void sendHookMessage(HookNotificationMessage message) throws NotificationException, InterruptedException { + private void sendHookMessage(HookNotification message) throws NotificationException, InterruptedException { notificationInterface.send(NotificationInterface.NotificationType.HOOK, message); + sleep(1000); } @Test public void testMessageHandleFailureConsumerContinues() throws Exception { //send invalid message - update with invalid type - sendHookMessage(new HookNotification.EntityPartialUpdateRequest(TEST_USER, randomString(), null, null, - new Referenceable(randomString()))); + sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, randomString(), null, null, new Referenceable(randomString()))); //send valid message final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - String dbName = "db" + randomString(); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); entity.set(CLUSTER_NAME, randomString()); + sendHookMessage(new EntityCreateRequest(TEST_USER, entity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_BUILTIN, entity.get(NAME))); + return results.length() == 1; } }); @@ -91,24 +95,28 @@ public class NotificationHookConsumerIT extends BaseResourceIT { @Test public void testCreateEntity() throws Exception { final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - String dbName = "db" + randomString(); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); entity.set(CLUSTER_NAME, randomString()); sendHookMessage(new EntityCreateRequest(TEST_USER, entity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, entity.get(QUALIFIED_NAME))); + return results.length() == 1; } }); //Assert that user passed in hook message is used in audit - Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME)); - List<EntityAuditEvent> events = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1); + Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME)); + List<EntityAuditEvent> events = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1); + assertEquals(events.size(), 1); assertEquals(events.get(0).getUser(), TEST_USER); } @@ -116,7 +124,8 @@ public class NotificationHookConsumerIT extends BaseResourceIT { @Test public void testUpdateEntityPartial() throws Exception { final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String dbName = "db" + randomString(); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); @@ -125,25 +134,31 @@ public class NotificationHookConsumerIT extends BaseResourceIT { atlasClientV1.createEntity(entity); final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN); + newEntity.set("owner", randomString()); + sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { Referenceable localEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName); + return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner"))); } }); //Its partial update and un-set fields are not updated Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName); + assertEquals(actualEntity.get(DESCRIPTION), entity.get(DESCRIPTION)); } @Test public void testUpdatePartialUpdatingQualifiedName() throws Exception { final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String dbName = "db" + randomString(); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); @@ -152,28 +167,32 @@ public class NotificationHookConsumerIT extends BaseResourceIT { atlasClientV1.createEntity(entity); final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String newName = "db" + randomString(); + final String newName = "db" + randomString(); + newEntity.set(QUALIFIED_NAME, newName); sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newName)); + return results.length() == 1; } }); //no entity with the old qualified name JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName)); - assertEquals(results.length(), 0); + assertEquals(results.length(), 0); } @Test public void testDeleteByQualifiedName() throws Exception { - Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String dbName = "db" + randomString(); + final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); @@ -182,10 +201,12 @@ public class NotificationHookConsumerIT extends BaseResourceIT { final String dbId = atlasClientV1.createEntity(entity).get(0); sendHookMessage(new EntityDeleteRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { Referenceable getEntity = atlasClientV1.getEntity(dbId); + return getEntity.getId().getState() == Id.EntityState.DELETED; } }); @@ -193,8 +214,9 @@ public class NotificationHookConsumerIT extends BaseResourceIT { @Test public void testUpdateEntityFullUpdate() throws Exception { - Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String dbName = "db" + randomString(); + final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); @@ -203,6 +225,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { atlasClientV1.createEntity(entity); final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN); + newEntity.set(NAME, randomString()); newEntity.set(DESCRIPTION, randomString()); newEntity.set("owner", randomString()); @@ -211,18 +234,19 @@ public class NotificationHookConsumerIT extends BaseResourceIT { //updating unique attribute sendHookMessage(new EntityUpdateRequest(TEST_USER, newEntity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newEntity.get(QUALIFIED_NAME))); + return results.length() == 1; } }); Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName); + assertEquals(actualEntity.get(DESCRIPTION), newEntity.get(DESCRIPTION)); assertEquals(actualEntity.get("owner"), newEntity.get("owner")); } - - } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index eb37fa8..4ea13c7 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -25,14 +25,15 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.kafka.NotificationProvider; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.notification.HookNotificationV1; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.EntityStream; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.web.service.ServiceState; import org.apache.commons.lang.RandomStringUtils; import org.mockito.Mock; @@ -41,7 +42,7 @@ import org.testng.Assert; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; + import java.util.List; import org.apache.atlas.kafka.AtlasKafkaConsumer; @@ -57,11 +58,11 @@ import static org.testng.Assert.*; public class NotificationHookConsumerKafkaTest { - - public static final String NAME = "name"; - public static final String DESCRIPTION = "description"; + public static final String NAME = "name"; + public static final String DESCRIPTION = "description"; public static final String QUALIFIED_NAME = "qualifiedName"; - private NotificationInterface notificationInterface = NotificationProvider.get(); + + private final NotificationInterface notificationInterface = NotificationProvider.get(); @Mock @@ -81,10 +82,14 @@ public class NotificationHookConsumerKafkaTest { @BeforeTest public void setup() throws AtlasException, InterruptedException, AtlasBaseException { MockitoAnnotations.initMocks(this); - AtlasType mockType = mock(AtlasType.class); + + AtlasType mockType = mock(AtlasType.class); + AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class); + when(typeRegistry.getType(anyString())).thenReturn(mockType); - AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); + when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity); + kafkaNotification = startKafkaServer(); } @@ -97,19 +102,20 @@ public class NotificationHookConsumerKafkaTest { @Test public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException { try { - produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); - NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false); - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); + verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); // produce another message, and make sure it moves ahead. If commit succeeded, this would work. - produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity())); consumeOneMessage(consumer, hookConsumer); + verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); reset(atlasEntityStore); } @@ -121,22 +127,20 @@ public class NotificationHookConsumerKafkaTest { @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled") public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception { try { - produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity())); - NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true); + NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, true); assertNotNull (consumer); - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); - + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. - produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user4", createEntity())); consumeOneMessage(consumer, hookConsumer); verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); @@ -146,18 +150,19 @@ public class NotificationHookConsumerKafkaTest { } } - AtlasKafkaConsumer<HookNotificationMessage> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { + AtlasKafkaConsumer<HookNotification> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0); } - void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer, + void consumeOneMessage(NotificationConsumer<HookNotification> consumer, NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException { try { long startTime = System.currentTimeMillis(); //fetch starting time + while ((System.currentTimeMillis() - startTime) < 10000) { - List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(); + List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive(); - for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { + for (AtlasKafkaMessage<HookNotification> msg : messages) { hookConsumer.handleMessage(msg); } @@ -172,19 +177,25 @@ public class NotificationHookConsumerKafkaTest { Referenceable createEntity() { final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE); + entity.set(NAME, "db" + randomString()); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, randomString()); + return entity; } KafkaNotification startKafkaServer() throws AtlasException, InterruptedException { Configuration applicationProperties = ApplicationProperties.get(); + applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); kafkaNotification = new KafkaNotification(applicationProperties); + kafkaNotification.start(); + Thread.sleep(2000); + return kafkaNotification; } @@ -192,8 +203,7 @@ public class NotificationHookConsumerKafkaTest { return RandomStringUtils.randomAlphanumeric(10); } - private void produceMessage(HookNotificationMessage message) throws NotificationException { + private void produceMessage(HookNotification message) throws NotificationException { kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message); } - } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index e2d1022..f8bd9a1 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -22,15 +22,17 @@ import org.apache.atlas.AtlasServiceException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.kafka.AtlasKafkaMessage; -import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.EntityMutationResponse; -import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.model.notification.HookNotification.HookNotificationType; +import org.apache.atlas.notification.NotificationInterface.NotificationType; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.EntityStream; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.web.service.ServiceState; import org.apache.commons.configuration.Configuration; import org.apache.kafka.common.TopicPartition; @@ -43,6 +45,7 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; @@ -75,20 +78,24 @@ public class NotificationHookConsumerTest { @BeforeMethod public void setup() throws AtlasBaseException { MockitoAnnotations.initMocks(this); - AtlasType mockType = mock(AtlasType.class); + + AtlasType mockType = mock(AtlasType.class); + AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class); + when(typeRegistry.getType(anyString())).thenReturn(mockType); - AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity); + EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class); + when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse); } @Test public void testConsumerCanProceedIfServerIsReady() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); - NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); assertTrue(hookConsumer.serverAvailable(timer)); @@ -98,10 +105,9 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); - NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); when(serviceState.getState()) .thenReturn(ServiceState.ServiceStateValue.PASSIVE) @@ -116,35 +122,30 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationConsumer consumer = mock(NotificationConsumer.class); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(consumer); - HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationConsumer consumer = mock(NotificationConsumer.class); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + EntityCreateRequest message = mock(EntityCreateRequest.class); + Referenceable mock = mock(Referenceable.class); + when(message.getUser()).thenReturn("user"); - when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE); - Referenceable mock = mock(Referenceable.class); + when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE); when(message.getEntities()).thenReturn(Arrays.asList(mock)); hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); + verify(consumer).commit(any(TopicPartition.class), anyInt()); } @Test public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException { - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationConsumer consumer = mock(NotificationConsumer.class); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(consumer); - HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", - new ArrayList<Referenceable>() { - { - add(mock(Referenceable.class)); - } - }); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationConsumer consumer = mock(NotificationConsumer.class); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class))); + when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message")); + hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); verifyZeroInteractions(consumer); @@ -152,10 +153,10 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); - NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); @@ -164,58 +165,75 @@ public class NotificationHookConsumerTest { @Test public void testConsumersStartedIfHAIsDisabled() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - consumers.add(mock(NotificationConsumer.class)); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). - thenReturn(consumers); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + notificationHookConsumer.startInternal(configuration, executorService); - verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); + + verify(notificationInterface).createConsumers(NotificationType.HOOK, 1); verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); } @Test public void testConsumersAreNotStartedIfHAIsEnabled() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - consumers.add(mock(NotificationConsumer.class)); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). - thenReturn(consumers); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + notificationHookConsumer.startInternal(configuration, executorService); + verifyZeroInteractions(notificationInterface); } @Test public void testConsumersAreStartedWhenInstanceBecomesActive() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - consumers.add(mock(NotificationConsumer.class)); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). - thenReturn(consumers); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsActive(); - verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); + + verify(notificationInterface).createConsumers(NotificationType.HOOK, 1); verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); } @Test public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); - consumers.add(notificationConsumerMock); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers); final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); doAnswer(new Answer() { @@ -223,12 +241,14 @@ public class NotificationHookConsumerTest { public Object answer(InvocationOnMock invocationOnMock) throws Throwable { notificationHookConsumer.consumers.get(0).start(); Thread.sleep(500); + return null; } }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); + verify(notificationInterface).close(); verify(executorService).shutdown(); verify(notificationConsumerMock).wakeup(); @@ -236,18 +256,21 @@ public class NotificationHookConsumerTest { @Test public void consumersStoppedBeforeStarting() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); - consumers.add(notificationConsumerMock); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers); final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); + verify(notificationInterface).close(); verify(executorService).shutdown(); } @@ -261,13 +284,16 @@ public class NotificationHookConsumerTest { public Object answer(InvocationOnMock invocationOnMock) throws Throwable { notificationHookConsumer.consumers.get(0).start(); Thread.sleep(1000); + return null; } }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); notificationHookConsumer.startInternal(configuration, executorService); Thread.sleep(1000); + assertTrue(notificationHookConsumer.consumers.get(0).isAlive()); + notificationHookConsumer.consumers.get(0).shutdown(); } @@ -280,27 +306,32 @@ public class NotificationHookConsumerTest { public Object answer(InvocationOnMock invocationOnMock) throws Throwable { notificationHookConsumer.consumers.get(0).start(); Thread.sleep(500); + return null; } }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); notificationHookConsumer.startInternal(configuration, executorService); Thread.sleep(500); + notificationHookConsumer.consumers.get(0).shutdown(); Thread.sleep(500); + assertFalse(notificationHookConsumer.consumers.get(0).isAlive()); } private NotificationHookConsumer setupNotificationHookConsumer() throws AtlasException { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException()); - consumers.add(notificationConsumerMock); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers); return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java b/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java index 87259df..0d4af1e 100644 --- a/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java +++ b/webapp/src/test/java/org/apache/atlas/util/RestUtilsTest.java @@ -22,12 +22,10 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; import org.apache.atlas.model.typedef.AtlasEntityDef; import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; @@ -44,24 +42,12 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeRegistry.AtlasTransientTypeRegistry; -import org.apache.atlas.typesystem.TypesDef; -import org.apache.atlas.typesystem.json.TypesSerialization; -import org.apache.atlas.typesystem.types.AttributeDefinition; -import org.apache.atlas.typesystem.types.ClassType; -import org.apache.atlas.typesystem.types.DataTypes; import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; -import org.apache.atlas.typesystem.types.EnumTypeDefinition; -import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; -import org.apache.atlas.typesystem.types.Multiplicity; -import org.apache.atlas.typesystem.types.StructTypeDefinition; -import org.apache.atlas.typesystem.types.TraitType; -import org.apache.atlas.typesystem.types.utils.TypesUtil; +import org.apache.atlas.v1.model.typedef.*; +import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; import org.testng.Assert; import org.testng.annotations.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; - /** * Validates that conversion from V1 to legacy types (and back) is consistent. This also tests * that the conversion logic in AtlasStructDefStoreV1 is consistent with the conversion logic @@ -76,13 +62,13 @@ public class RestUtilsTest { // in tables attribute in "database" type is lost. See ATLAS-1528. public void testBidirectonalCompositeMappingConsistent() throws AtlasBaseException { - HierarchicalTypeDefinition<ClassType> dbV1Type = TypesUtil.createClassTypeDef("database", - ImmutableSet.<String> of(), new AttributeDefinition("tables", DataTypes.arrayTypeName("table"), - Multiplicity.OPTIONAL, true, "containingDatabase")); + ClassTypeDefinition dbV1Type = TypesUtil.createClassTypeDef("database", "", Collections.emptySet(), + new AttributeDefinition("tables", AtlasBaseTypeDef.getArrayTypeName("table"), + Multiplicity.OPTIONAL, true, "containingDatabase")); - HierarchicalTypeDefinition<ClassType> tableV1Type = TypesUtil.createClassTypeDef("table", - ImmutableSet.<String> of(), - new AttributeDefinition("containingDatabase", "database", Multiplicity.OPTIONAL, false, "tables")); + ClassTypeDefinition tableV1Type = TypesUtil.createClassTypeDef("table", "", Collections.emptySet(), + new AttributeDefinition("containingDatabase", "database", + Multiplicity.OPTIONAL, false, "tables")); testV1toV2toV1Conversion(Arrays.asList(dbV1Type, tableV1Type), new boolean[] { true, false }); } @@ -92,121 +78,118 @@ public class RestUtilsTest { // "containingDatabase" is lost // in "table" attribute in "database". See ATLAS-1528. public void testBidirectonalNonCompositeMappingConsistent() throws AtlasBaseException { + ClassTypeDefinition dbV1Type = TypesUtil.createClassTypeDef("database", "", Collections.emptySet(), + new AttributeDefinition("tables", AtlasBaseTypeDef.getArrayTypeName("table"), + Multiplicity.OPTIONAL, false, "containingDatabase")); - HierarchicalTypeDefinition<ClassType> dbV1Type = TypesUtil.createClassTypeDef("database", - ImmutableSet.<String> of(), new AttributeDefinition("tables", DataTypes.arrayTypeName("table"), - Multiplicity.OPTIONAL, false, "containingDatabase")); - - HierarchicalTypeDefinition<ClassType> tableV1Type = TypesUtil.createClassTypeDef("table", - ImmutableSet.<String> of(), - new AttributeDefinition("containingDatabase", "database", Multiplicity.OPTIONAL, false, "tables")); + ClassTypeDefinition tableV1Type = TypesUtil.createClassTypeDef("table", "", Collections.emptySet(), + new AttributeDefinition("containingDatabase", "database", + Multiplicity.OPTIONAL, false, "tables")); testV1toV2toV1Conversion(Arrays.asList(dbV1Type, tableV1Type), new boolean[] { false, false }); } private AtlasTypeDefGraphStoreV1 makeTypeStore(AtlasTypeRegistry reg) { - AtlasTypeDefGraphStoreV1 result = mock(AtlasTypeDefGraphStoreV1.class); for (AtlasEntityType type : reg.getAllEntityTypes()) { - String typeName = type.getTypeName(); + String typeName = type.getTypeName(); AtlasVertex typeVertex = mock(AtlasVertex.class); + when(result.isTypeVertex(eq(typeVertex), any(TypeCategory.class))).thenReturn(true); - when(typeVertex.getProperty(eq(Constants.TYPE_CATEGORY_PROPERTY_KEY), eq(TypeCategory.class))) - .thenReturn(TypeCategory.CLASS); + when(typeVertex.getProperty(eq(Constants.TYPE_CATEGORY_PROPERTY_KEY), eq(TypeCategory.class))).thenReturn(TypeCategory.CLASS); String attributeListPropertyKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(typeName); - when(typeVertex.getProperty(eq(attributeListPropertyKey), eq(List.class))) - .thenReturn(new ArrayList<>(type.getAllAttributes().keySet())); + + when(typeVertex.getProperty(eq(attributeListPropertyKey), eq(List.class))).thenReturn(new ArrayList<>(type.getAllAttributes().keySet())); + for (AtlasAttribute attribute : type.getAllAttributes().values()) { String attributeDefPropertyKey = AtlasGraphUtilsV1.getTypeDefPropertyKey(typeName, attribute.getName()); - String attributeJson = AtlasStructDefStoreV1.toJsonFromAttribute(attribute); + String attributeJson = AtlasStructDefStoreV1.toJsonFromAttribute(attribute); + when(typeVertex.getProperty(eq(attributeDefPropertyKey), eq(String.class))).thenReturn(attributeJson); } + when(result.findTypeVertexByName(eq(typeName))).thenReturn(typeVertex); } + return result; } - private AtlasAttributeDef convertToJsonAndBack(AtlasTypeRegistry registry, AtlasStructDef structDef, - AtlasAttributeDef attributeDef, boolean compositeExpected) throws AtlasBaseException { - + private AtlasAttributeDef convertToJsonAndBack(AtlasTypeRegistry registry, AtlasStructDef structDef, AtlasAttributeDef attributeDef, boolean compositeExpected) throws AtlasBaseException { AtlasTypeDefGraphStoreV1 typeDefStore = makeTypeStore(registry); - AtlasStructType structType = (AtlasStructType) registry.getType(structDef.getName()); - AtlasAttribute attribute = structType.getAttribute(attributeDef.getName()); - String attribJson = AtlasStructDefStoreV1.toJsonFromAttribute(attribute); + AtlasStructType structType = (AtlasStructType) registry.getType(structDef.getName()); + AtlasAttribute attribute = structType.getAttribute(attributeDef.getName()); + String attribJson = AtlasStructDefStoreV1.toJsonFromAttribute(attribute); + Map attrInfo = AtlasType.fromJson(attribJson, Map.class); - Map attrInfo = AtlasType.fromJson(attribJson, Map.class); Assert.assertEquals(attrInfo.get("isComposite"), compositeExpected); + return AtlasStructDefStoreV1.toAttributeDefFromJson(structDef, attrInfo, typeDefStore); } - private void testV1toV2toV1Conversion(List<HierarchicalTypeDefinition<ClassType>> typesToTest, - boolean[] compositeExpected) throws AtlasBaseException { - + private void testV1toV2toV1Conversion(List<ClassTypeDefinition> typesToTest, boolean[] compositeExpected) throws AtlasBaseException { List<AtlasEntityDef> convertedEntityDefs = convertV1toV2(typesToTest); + AtlasTypeRegistry registry = createRegistry(convertedEntityDefs); - AtlasTypeRegistry registry = createRegistry(convertedEntityDefs); for(int i = 0 ; i < convertedEntityDefs.size(); i++) { AtlasEntityDef def = convertedEntityDefs.get(i); + for (AtlasAttributeDef attrDef : def.getAttributeDefs()) { AtlasAttributeDef converted = convertToJsonAndBack(registry, def, attrDef, compositeExpected[i]); + Assert.assertEquals(converted, attrDef); } } - List<HierarchicalTypeDefinition<ClassType>> convertedBackTypeDefs = convertV2toV1(convertedEntityDefs); + List<ClassTypeDefinition> convertedBackTypeDefs = convertV2toV1(convertedEntityDefs); for (int i = 0; i < typesToTest.size(); i++) { + ClassTypeDefinition convertedBack = convertedBackTypeDefs.get(i); - HierarchicalTypeDefinition<ClassType> convertedBack = convertedBackTypeDefs.get(i); Assert.assertEquals(convertedBack, typesToTest.get(i)); - AttributeDefinition[] attributeDefinitions = convertedBack.attributeDefinitions; - if (attributeDefinitions.length > 0) { - Assert.assertEquals(attributeDefinitions[0].isComposite, compositeExpected[i]); + + List<AttributeDefinition> attributeDefinitions = convertedBack.getAttributeDefinitions(); + + if (attributeDefinitions.size() > 0) { + Assert.assertEquals(attributeDefinitions.get(0).getIsComposite(), compositeExpected[i]); } } - } - private List<HierarchicalTypeDefinition<ClassType>> convertV2toV1(List<AtlasEntityDef> toConvert) - throws AtlasBaseException { - - AtlasTypeRegistry reg = createRegistry(toConvert); + private List<ClassTypeDefinition> convertV2toV1(List<AtlasEntityDef> toConvert) throws AtlasBaseException { + AtlasTypeRegistry reg = createRegistry(toConvert); + List<ClassTypeDefinition> result = new ArrayList<>(toConvert.size()); - List<HierarchicalTypeDefinition<ClassType>> result = new ArrayList<>(toConvert.size()); for (int i = 0; i < toConvert.size(); i++) { - AtlasEntityDef entityDef = toConvert.get(i); - AtlasEntityType entity = reg.getEntityTypeByName(entityDef.getName()); - HierarchicalTypeDefinition<ClassType> converted = TypeConverterUtil.toTypesDef(entity, reg) - .classTypesAsJavaList().get(0); + AtlasEntityDef entityDef = toConvert.get(i); + AtlasEntityType entity = reg.getEntityTypeByName(entityDef.getName()); + ClassTypeDefinition converted = TypeConverterUtil.toTypesDef(entity, reg).getClassTypes().get(0); + result.add(converted); } + return result; } private AtlasTypeRegistry createRegistry(List<AtlasEntityDef> toConvert) throws AtlasBaseException { - AtlasTypeRegistry reg = new AtlasTypeRegistry(); + AtlasTypeRegistry reg = new AtlasTypeRegistry(); AtlasTransientTypeRegistry tmp = reg.lockTypeRegistryForUpdate(); + tmp.addTypes(toConvert); reg.releaseTypeRegistryForUpdate(tmp, true); + return reg; } - private List<AtlasEntityDef> convertV1toV2(List<HierarchicalTypeDefinition<ClassType>> types) - throws AtlasBaseException { - - ImmutableList<HierarchicalTypeDefinition<ClassType>> classTypeList = ImmutableList - .<HierarchicalTypeDefinition<ClassType>> builder().addAll(types).build(); - - TypesDef toConvert = TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition> of(), - ImmutableList.<StructTypeDefinition> of(), ImmutableList.<HierarchicalTypeDefinition<TraitType>> of(), - classTypeList); + private List<AtlasEntityDef> convertV1toV2(List<ClassTypeDefinition> types) throws AtlasBaseException { + List<ClassTypeDefinition> classTypeList = new ArrayList(types); + TypesDef toConvert = new TypesDef(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), classTypeList); + String json = AtlasType.toV1Json(toConvert); + AtlasTypeRegistry emptyRegistry = new AtlasTypeRegistry(); + AtlasTypesDef converted = TypeConverterUtil.toAtlasTypesDef(json, emptyRegistry); + List<AtlasEntityDef> convertedEntityDefs = converted.getEntityDefs(); - String json = TypesSerialization.toJson(toConvert); - AtlasTypeRegistry emptyRegistry = new AtlasTypeRegistry(); - AtlasTypesDef converted = TypeConverterUtil.toAtlasTypesDef(json, emptyRegistry); - List<AtlasEntityDef> convertedEntityDefs = converted.getEntityDefs(); return convertedEntityDefs; } }