ATLAS-716 Entity update/delete notifications (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/705014eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/705014eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/705014eb Branch: refs/heads/master Commit: 705014eb3352180ff8f2cac05dbbc0809b421d8c Parents: 153fc36 Author: Shwetha GS <[email protected]> Authored: Thu May 26 00:02:32 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Thu May 26 00:04:22 2016 +0530 ---------------------------------------------------------------------- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 2 +- .../org/apache/atlas/hive/hook/HiveHookIT.java | 65 +++-- .../main/java/org/apache/atlas/AtlasClient.java | 112 ++++++-- .../java/org/apache/atlas/EntityAuditEvent.java | 40 ++- .../src/main/java/org/apache/atlas/SerDe.java | 79 ++++++ .../java/org/apache/atlas/AtlasClientTest.java | 26 ++ .../atlas/notification/MessageVersion.java | 5 + .../notification/NotificationInterface.java | 2 +- .../VersionedMessageDeserializer.java | 12 +- .../NotificationEntityChangeListener.java | 106 ------- .../AbstractNotificationConsumerTest.java | 14 +- release-log.txt | 1 + .../atlas/repository/MetadataRepository.java | 8 +- .../repository/audit/EntityAuditListener.java | 5 +- .../audit/HBaseBasedAuditRepository.java | 3 + .../atlas/repository/graph/DeleteHandler.java | 136 +++++---- .../graph/GraphBackedMetadataRepository.java | 22 +- .../atlas/repository/graph/GraphHelper.java | 59 ++-- .../graph/GraphToTypedInstanceMapper.java | 13 +- .../repository/graph/HardDeleteHandler.java | 8 +- .../repository/graph/SoftDeleteHandler.java | 32 ++- .../graph/TypedInstanceToGraphMapper.java | 181 ++++++------ .../typestore/GraphBackedTypeStore.java | 21 +- .../atlas/services/DefaultMetadataService.java | 72 ++--- .../audit/AuditRepositoryTestBase.java | 10 +- ...hBackedMetadataRepositoryDeleteTestBase.java | 221 +++++++++------ .../GraphBackedRepositoryHardDeleteTest.java | 63 +++- .../GraphBackedRepositorySoftDeleteTest.java | 99 ++++++- .../service/DefaultMetadataServiceTest.java | 284 ++++++++++++------- .../java/org/apache/atlas/RequestContext.java | 19 +- .../apache/atlas/services/MetadataService.java | 21 +- .../apache/atlas/typesystem/Referenceable.java | 2 +- .../java/org/apache/atlas/LocalAtlasClient.java | 22 +- .../NotificationEntityChangeListener.java | 161 +++++++++++ .../atlas/web/resources/EntityResource.java | 97 +++---- .../apache/atlas/web/service/ServiceModule.java | 2 +- .../org/apache/atlas/LocalAtlasClientTest.java | 21 +- .../notification/EntityNotificationIT.java | 2 - .../NotificationEntityChangeListenerTest.java | 90 ++++++ .../web/resources/EntityJerseyResourceIT.java | 48 ++-- .../atlas/web/service/CuratorFactoryTest.java | 5 - 41 files changed, 1443 insertions(+), 748 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 254e150..fe07d73 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -188,7 +188,7 @@ public class HiveMetaStoreBridge { List<String> guids = getAtlasClient().createEntity(entityJSON); LOG.debug("created instance for type " + typeName + ", guid: " + guids); - return new Referenceable(guids.get(0), referenceable.getTypeName(), null); + return new Referenceable(guids.get(guids.size() - 1), referenceable.getTypeName(), null); } /** http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index 70100f1..84d9a52 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -29,7 +29,6 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hive.rewrite.HiveASTRewriter; -import org.apache.atlas.hive.rewrite.RewriteException; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.persistence.Id; @@ -66,6 +65,7 @@ import java.util.Map; import static org.apache.atlas.hive.hook.HiveHook.lower; import static org.apache.atlas.hive.hook.HiveHook.normalize; +import static org.apache.atlas.hive.model.HiveDataModelGenerator.NAME; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; @@ -197,8 +197,8 @@ public class HiveHookIT { Assert.assertEquals(tableRef.get(HiveDataModelGenerator.TABLE_TYPE_ATTR), TableType.MANAGED_TABLE.name()); Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment"); String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); - Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), entityName); - Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), "default." + tableName.toLowerCase() + "@" + CLUSTER_NAME); + Assert.assertEquals(tableRef.get(NAME), entityName); + Assert.assertEquals(tableRef.get(NAME), "default." + tableName.toLowerCase() + "@" + CLUSTER_NAME); Table t = hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, tableName); long createTime = Long.parseLong(t.getMetadata().getProperty(hive_metastoreConstants.DDL_TIME)) * HiveMetaStoreBridge.MILLIS_CONVERT_FACTOR; @@ -631,7 +631,7 @@ public class HiveHookIT { final String newDBName = createDatabase(); assertTableIsRegistered(DEFAULT_DB, tableName); - String columnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), HiveDataModelGenerator.NAME)); + String columnGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME)); String sdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName)), null); assertDatabaseIsRegistered(newDBName); @@ -649,10 +649,10 @@ public class HiveHookIT { String query = String.format("alter table %s rename to %s", DEFAULT_DB + "." + tableName, newDBName + "." + newTableName); runCommand(query); - String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), HiveDataModelGenerator.NAME)); + String newColGuid = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName), NAME)); Assert.assertEquals(newColGuid, columnGuid); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, tableName), HiveDataModelGenerator.NAME)); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, tableName), NAME)); assertTrait(columnGuid, colTraitDetails); String newSdGuid = assertSDIsRegistered(HiveMetaStoreBridge.getStorageDescQFName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, newDBName, newTableName)), null); @@ -676,7 +676,16 @@ public class HiveHookIT { private List<Referenceable> getColumns(String dbName, String tableName) throws Exception { String tableId = assertTableIsRegistered(dbName, tableName); Referenceable tableRef = atlasClient.getEntity(tableId); - return ((List<Referenceable>)tableRef.get(HiveDataModelGenerator.COLUMNS)); + + //with soft delete, the deleted columns are returned as well. So, filter the deleted ones + List<Referenceable> columns = ((List<Referenceable>) tableRef.get(HiveDataModelGenerator.COLUMNS)); + List<Referenceable> activeColumns = new ArrayList<>(); + for (Referenceable col : columns) { + if (col.getId().getState() == Id.EntityState.ACTIVE) { + activeColumns.add(col); + } + } + return activeColumns; } @@ -723,21 +732,15 @@ public class HiveHookIT { colDropped)); //Verify the number of columns present in the table - assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { - @Override - public void assertOnEntity(Referenceable tableRef) throws Exception { - List<Referenceable> columns = (List<Referenceable>) tableRef.get(HiveDataModelGenerator.COLUMNS); - Assert.assertEquals(columns.size(), 1); - Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), HiveDataModelGenerator.NAME); - - } - }); + List<Referenceable> columns = getColumns(DEFAULT_DB, tableName); + assertEquals(columns.size(), 1); + assertEquals(columns.get(0).get(NAME), "name"); } @Test public void testAlterTableChangeColumn() throws Exception { //Change name - String oldColName = HiveDataModelGenerator.NAME; + String oldColName = NAME; String newColName = "name1"; String tableName = createTable(); String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName); @@ -818,8 +821,8 @@ public class HiveHookIT { @Override public void assertOnEntity(Referenceable entity) throws Exception { List<Referenceable> columns = (List<Referenceable>) entity.get(HiveDataModelGenerator.COLUMNS); - assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), finalNewColName); - assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), "id"); + assertEquals(columns.get(0).get(NAME), finalNewColName); + assertEquals(columns.get(1).get(NAME), "id"); } } ); @@ -846,8 +849,8 @@ public class HiveHookIT { @Override public void assertOnEntity(Referenceable entity) throws Exception { List<Referenceable> columns = (List<Referenceable>) entity.get(HiveDataModelGenerator.COLUMNS); - assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), finalNewColName2); - assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "id"); + assertEquals(columns.get(1).get(NAME), finalNewColName2); + assertEquals(columns.get(0).get(NAME), "id"); } } ); @@ -955,7 +958,7 @@ public class HiveHookIT { Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId); Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed); - Assert.assertEquals(hdfsPathRef.get(HiveDataModelGenerator.NAME), testPathNormed); + Assert.assertEquals(hdfsPathRef.get(NAME), testPathNormed); // Assert.assertEquals(hdfsPathRef.get("name"), new Path(testPath).getName()); Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed); @@ -964,7 +967,7 @@ public class HiveHookIT { private String assertHDFSPathIsRegistered(String path) throws Exception { LOG.debug("Searching for hdfs path {}", path); - return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), HiveDataModelGenerator.NAME, path, null); + return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), NAME, path, null); } @Test @@ -1014,7 +1017,7 @@ public class HiveHookIT { ImmutableList<String> cols = ImmutableList.of("id"); runBucketSortQuery(tableName, 5, cols, cols); - cols = ImmutableList.of("id", HiveDataModelGenerator.NAME); + cols = ImmutableList.of("id", NAME); runBucketSortQuery(tableName, 2, cols, cols); } @@ -1077,7 +1080,7 @@ public class HiveHookIT { assertTableIsRegistered(DEFAULT_DB, tableName); assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id")); - assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), HiveDataModelGenerator.NAME)); + assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), NAME)); final String query = String.format("drop table %s ", tableName); runCommand(query); @@ -1086,7 +1089,7 @@ public class HiveHookIT { "id")); assertColumnIsNotRegistered(HiveMetaStoreBridge .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), - HiveDataModelGenerator.NAME)); + NAME)); assertTableIsNotRegistered(DEFAULT_DB, tableName); } @@ -1110,7 +1113,7 @@ public class HiveHookIT { HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id")); assertColumnIsNotRegistered(HiveMetaStoreBridge .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), - HiveDataModelGenerator.NAME)); + NAME)); for(int i = 0; i < numTables; i++) { assertTableIsNotRegistered(dbName, tableNames[i]); @@ -1175,7 +1178,7 @@ public class HiveHookIT { assertTableIsRegistered(DEFAULT_DB, viewName); assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id")); - assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), HiveDataModelGenerator.NAME)); + assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), NAME)); query = String.format("drop view %s ", viewName); @@ -1185,7 +1188,7 @@ public class HiveHookIT { "id")); assertColumnIsNotRegistered(HiveMetaStoreBridge .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), - HiveDataModelGenerator.NAME)); + NAME)); assertTableIsNotRegistered(DEFAULT_DB, viewName); } @@ -1349,7 +1352,7 @@ public class HiveHookIT { if (inputTblName != null) { Referenceable inputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{ - put(HiveDataModelGenerator.NAME, inputTblName); + put(NAME, inputTblName); }}); inputs = new ArrayList<Referenceable>(); inputs.add(inputTableRef); @@ -1357,7 +1360,7 @@ public class HiveHookIT { List<Referenceable> outputs = null; if (outputTblName != null) { Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{ - put(HiveDataModelGenerator.NAME, outputTblName); + put(NAME, outputTblName); }}); outputs = new ArrayList<Referenceable>(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index 7e32cc2..be178dc 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -20,11 +20,14 @@ package org.apache.atlas; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; import org.apache.atlas.security.SecureClientUtils; import org.apache.atlas.typesystem.Referenceable; @@ -45,6 +48,7 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -54,8 +58,10 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; -import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; +import java.util.Map; + import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED; /** @@ -65,9 +71,10 @@ public class AtlasClient { private static final Logger LOG = LoggerFactory.getLogger(AtlasClient.class); public static final String NAME = "name"; - public static final String GUID = "GUID"; public static final String TYPE = "type"; public static final String TYPENAME = "typeName"; + public static final String GUID = "GUID"; + public static final String ENTITIES = "entities"; public static final String DEFINITION = "definition"; public static final String ERROR = "error"; @@ -340,6 +347,61 @@ public class AtlasClient { return service; } + public static class EntityResult { + private static final Gson gson = new GsonBuilder().setPrettyPrinting().create(); + + public static final String OP_CREATED = "created"; + public static final String OP_UPDATED = "updated"; + public static final String OP_DELETED = "deleted"; + + Map<String, List<String>> entities = new HashMap<>(); + + public EntityResult() { + //For gson + } + + public EntityResult(List<String> created, List<String> updated, List<String> deleted) { + add(OP_CREATED, created); + add(OP_UPDATED, updated); + add(OP_DELETED, deleted); + } + + private void add(String type, List<String> list) { + if (list != null && list.size() > 0) { + entities.put(type, list); + } + } + + private List<String> get(String type) { + List<String> list = entities.get(type); + if (list == null) { + list = new ArrayList<>(); + } + return list; + } + + public List<String> getCreatedEntities() { + return get(OP_CREATED); + } + + public List<String> getUpdateEntities() { + return get(OP_UPDATED); + } + + public List<String> getDeletedEntities() { + return get(OP_DELETED); + } + + @Override + public String toString() { + return gson.toJson(this); + } + + public static EntityResult fromString(String json) throws AtlasServiceException { + return gson.fromJson(json, EntityResult.class); + } + } + /** * Return status of the service instance the client is pointing to. * @@ -562,11 +624,15 @@ public class AtlasClient { protected List<String> createEntity(JSONArray entities) throws AtlasServiceException { LOG.debug("Creating entities: {}", entities); JSONObject response = callAPI(API.CREATE_ENTITY, entities.toString()); - List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>()); + List<String> results = extractEntityResult(response).getCreatedEntities(); LOG.debug("Create entities returned results: {}", results); return results; } + protected EntityResult extractEntityResult(JSONObject response) throws AtlasServiceException { + return EntityResult.fromString(response.toString()); + } + /** * Create the given entity * @param entitiesAsJson entity(type instance) as json @@ -601,19 +667,19 @@ public class AtlasClient { * @return json array of guids which were updated/created * @throws AtlasServiceException */ - public List<String> updateEntities(Referenceable... entities) throws AtlasServiceException { + public EntityResult updateEntities(Referenceable... entities) throws AtlasServiceException { return updateEntities(Arrays.asList(entities)); } - protected List<String> updateEntities(JSONArray entities) throws AtlasServiceException { + protected EntityResult updateEntities(JSONArray entities) throws AtlasServiceException { LOG.debug("Updating entities: {}", entities); JSONObject response = callAPI(API.UPDATE_ENTITY, entities.toString()); - List<String> results = extractResults(response, GUID, new ExtractOperation<String, String>()); + EntityResult results = extractEntityResult(response); LOG.debug("Update entities returned results: {}", results); return results; } - public List<String> updateEntities(Collection<Referenceable> entities) throws AtlasServiceException { + public EntityResult updateEntities(Collection<Referenceable> entities) throws AtlasServiceException { JSONArray entitiesArray = getEntitiesArray(entities); return updateEntities(entitiesArray); } @@ -625,9 +691,10 @@ public class AtlasClient { * @param attribute property key * @param value property value */ - public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException { + public EntityResult updateEntityAttribute(final String guid, final String attribute, String value) + throws AtlasServiceException { LOG.debug("Updating entity id: {}, attribute name: {}, attribute value: {}", guid, attribute, value); - callAPIWithRetries(API.UPDATE_ENTITY_PARTIAL, value, new ResourceCreator() { + JSONObject response = callAPIWithRetries(API.UPDATE_ENTITY_PARTIAL, value, new ResourceCreator() { @Override public WebResource createResource() { API api = API.UPDATE_ENTITY_PARTIAL; @@ -636,6 +703,7 @@ public class AtlasClient { return resource; } }); + return extractEntityResult(response); } @VisibleForTesting @@ -665,10 +733,11 @@ public class AtlasClient { * @param guid guid * @param entity entity definition */ - public void updateEntity(String guid, Referenceable entity) throws AtlasServiceException { + public EntityResult updateEntity(String guid, Referenceable entity) throws AtlasServiceException { String entityJson = InstanceSerialization.toJson(entity, true); LOG.debug("Updating entity id {} with {}", guid, entityJson); - callAPI(API.UPDATE_ENTITY_PARTIAL, entityJson, guid); + JSONObject response = callAPI(API.UPDATE_ENTITY_PARTIAL, entityJson, guid); + return extractEntityResult(response); } /** @@ -691,8 +760,9 @@ public class AtlasClient { * @param uniqueAttributeValue Attribute Value that uniquely identifies the entity * @param entity entity definition */ - public String updateEntity(final String entityType, final String uniqueAttributeName, final String uniqueAttributeValue, - Referenceable entity) throws AtlasServiceException { + public EntityResult updateEntity(final String entityType, final String uniqueAttributeName, + final String uniqueAttributeValue, + Referenceable entity) throws AtlasServiceException { final API api = API.UPDATE_ENTITY_PARTIAL; String entityJson = InstanceSerialization.toJson(entity, true); LOG.debug("Updating entity type: {}, attributeName: {}, attributeValue: {}, entity: {}", entityType, @@ -707,7 +777,7 @@ public class AtlasClient { return resource; } }); - String result = getString(response, GUID); + EntityResult result = extractEntityResult(response); LOG.debug("Update entity returned result: {}", result); return result; } @@ -724,10 +794,10 @@ public class AtlasClient { * Delete the specified entities from the repository * * @param guids guids of entities to delete - * @return List of deleted entity guids + * @return List of entity ids updated/deleted * @throws AtlasServiceException */ - public List<String> deleteEntities(final String ... guids) throws AtlasServiceException { + public EntityResult deleteEntities(final String ... guids) throws AtlasServiceException { LOG.debug("Deleting entities: {}", guids); JSONObject jsonResponse = callAPIWithRetries(API.DELETE_ENTITIES, null, new ResourceCreator() { @Override @@ -740,7 +810,7 @@ public class AtlasClient { return resource; } }); - List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>()); + EntityResult results = extractEntityResult(jsonResponse); LOG.debug("Delete entities returned results: {}", results); return results; } @@ -750,9 +820,9 @@ public class AtlasClient { * @param entityType Type of the entity being deleted * @param uniqueAttributeName Attribute Name that uniquely identifies the entity * @param uniqueAttributeValue Attribute Value that uniquely identifies the entity - * @return List of deleted entity guids(including composite references from that entity) + * @return List of entity ids updated/deleted(including composite references from that entity) */ - public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue) + public EntityResult deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue) throws AtlasServiceException { LOG.debug("Deleting entity type: {}, attributeName: {}, attributeValue: {}", entityType, uniqueAttributeName, uniqueAttributeValue); @@ -762,7 +832,7 @@ public class AtlasClient { resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName); resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue); JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource, null); - List<String> results = extractResults(jsonResponse, GUID, new ExtractOperation<String, String>()); + EntityResult results = extractEntityResult(jsonResponse); LOG.debug("Delete entities returned results: {}", results); return results; } @@ -901,7 +971,7 @@ public class AtlasClient { return extractResults(jsonResponse, AtlasClient.EVENTS, new ExtractOperation<EntityAuditEvent, JSONObject>() { @Override EntityAuditEvent extractElement(JSONObject element) throws JSONException { - return EntityAuditEvent.GSON.fromJson(element.toString(), EntityAuditEvent.class); + return SerDe.GSON.fromJson(element.toString(), EntityAuditEvent.class); } }); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/client/src/main/java/org/apache/atlas/EntityAuditEvent.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/EntityAuditEvent.java b/client/src/main/java/org/apache/atlas/EntityAuditEvent.java index 460f708..29a04ab 100644 --- a/client/src/main/java/org/apache/atlas/EntityAuditEvent.java +++ b/client/src/main/java/org/apache/atlas/EntityAuditEvent.java @@ -18,16 +18,14 @@ package org.apache.atlas; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.lang.StringUtils; /** * Structure of entity audit event */ public class EntityAuditEvent { - public static final Gson GSON = new GsonBuilder().create(); - public enum EntityAuditAction { ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE } @@ -38,16 +36,19 @@ public class EntityAuditEvent { private EntityAuditAction action; private String details; private String eventKey; + private IReferenceableInstance entityDefinition; public EntityAuditEvent() { } - public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) { + public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details, + IReferenceableInstance entityDefinition) throws AtlasException { this.entityId = entityId; this.timestamp = ts; this.user = user; this.action = action; this.details = details; + this.entityDefinition = entityDefinition; } @Override @@ -62,10 +63,12 @@ public class EntityAuditEvent { EntityAuditEvent otherEvent = (EntityAuditEvent) other; return StringUtils.equals(entityId, otherEvent.entityId) && - (timestamp == otherEvent.timestamp) && - StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) && - StringUtils.equals(details, otherEvent.details) && - StringUtils.equals(eventKey, otherEvent.eventKey); + (timestamp == otherEvent.timestamp) && + StringUtils.equals(user, otherEvent.user) && + (action == otherEvent.action) && + StringUtils.equals(details, otherEvent.details) && + StringUtils.equals(eventKey, otherEvent.eventKey) && + StringUtils.equals(getEntityDefinitionString(), otherEvent.getEntityDefinitionString()); } @Override @@ -75,11 +78,11 @@ public class EntityAuditEvent { @Override public String toString() { - return GSON.toJson(this); + return SerDe.GSON.toJson(this); } public static EntityAuditEvent fromString(String eventString) { - return GSON.fromJson(eventString, EntityAuditEvent.class); + return SerDe.GSON.fromJson(eventString, EntityAuditEvent.class); } public String getEntityId() { @@ -129,4 +132,19 @@ public class EntityAuditEvent { public void setEventKey(String eventKey) { this.eventKey = eventKey; } + + public IReferenceableInstance getEntityDefinition() { + return entityDefinition; + } + + public String getEntityDefinitionString() { + if (entityDefinition != null) { + return InstanceSerialization.toJson(entityDefinition, true); + } + return null; + } + + public void setEntityDefinition(String entityDefinition) { + this.entityDefinition = InstanceSerialization.fromJsonReferenceable(entityDefinition, true); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/client/src/main/java/org/apache/atlas/SerDe.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/SerDe.java b/client/src/main/java/org/apache/atlas/SerDe.java new file mode 100644 index 0000000..6b7478a --- /dev/null +++ b/client/src/main/java/org/apache/atlas/SerDe.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.IStruct; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.typesystem.json.InstanceSerialization; + +import java.lang.reflect.Type; + +public class SerDe { + public static final Gson GSON = new GsonBuilder(). + registerTypeAdapter(IStruct.class, new StructDeserializer()). + registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()). + registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()). + create(); + + /** + * Serde for Struct used by AbstractNotificationConsumer.GSON. + */ + public static final class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> { + @Override + public IStruct deserialize(final JsonElement json, final Type type, + final JsonDeserializationContext context) { + return context.deserialize(json, Struct.class); + } + + @Override + public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) { + String instanceJson = InstanceSerialization.toJson(src, true); + return new JsonParser().parse(instanceJson).getAsJsonObject(); + } + } + + /** + * Serde for Referenceable used by AbstractNotificationConsumer.GSON. + */ + public static final class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>, + JsonSerializer<IReferenceableInstance> { + @Override + public IReferenceableInstance deserialize(final JsonElement json, final Type type, + final JsonDeserializationContext context) { + + return InstanceSerialization.fromJsonReferenceable(json.toString(), true); + } + + @Override + public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) { + String instanceJson = InstanceSerialization.toJson(src, true); + return new JsonParser().parse(instanceJson).getAsJsonObject(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/client/src/test/java/org/apache/atlas/AtlasClientTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java index 0e80573..77a387f 100644 --- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java +++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java @@ -21,8 +21,12 @@ import com.sun.jersey.api.client.Client; import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.configuration.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.codehaus.jettison.json.JSONObject; +import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; @@ -33,9 +37,12 @@ import javax.ws.rs.core.UriBuilder; import java.net.ConnectException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.List; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -76,6 +83,25 @@ public class AtlasClientTest { assertTrue(atlasClient.isServerReady()); } + @Test + public void testCreateEntity() throws Exception { + setupRetryParams(); + AtlasClient atlasClient = new AtlasClient(service, configuration); + + WebResource.Builder builder = setupBuilder(AtlasClient.API.CREATE_ENTITY, service); + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.CREATED.getStatusCode()); + + JSONObject jsonResponse = new JSONObject(new AtlasClient.EntityResult(Arrays.asList("id"), null, null).toString()); + when(response.getEntity(String.class)).thenReturn(jsonResponse.toString()); + String entityJson = InstanceSerialization.toJson(new Referenceable("type"), true); + when(builder.method(anyString(), Matchers.<Class>any(), anyString())).thenReturn(response); + + List<String> ids = atlasClient.createEntity(entityJson); + assertEquals(ids.size(), 1); + assertEquals(ids.get(0), "id"); + } + private WebResource.Builder setupBuilder(AtlasClient.API api, WebResource webResource) { when(webResource.path(api.getPath())).thenReturn(service); WebResource.Builder builder = getBuilder(service); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java index 3f16a9a..6ef407a 100644 --- a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java +++ b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java @@ -97,6 +97,11 @@ public class MessageVersion implements Comparable<MessageVersion> { } + @Override + public String toString() { + return "MessageVersion[version=" + version + "]"; + } + // ----- helper methods -------------------------------------------------- /** http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index 384f383..ef8ee27 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -17,11 +17,11 @@ */ package org.apache.atlas.notification; +import com.google.gson.reflect.TypeToken; import org.apache.atlas.notification.entity.EntityMessageDeserializer; import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.notification.hook.HookMessageDeserializer; import org.apache.atlas.notification.hook.HookNotification; -import com.google.gson.reflect.TypeToken; import java.lang.reflect.Type; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java index 290be59..cc2099e 100644 --- a/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/VersionedMessageDeserializer.java @@ -31,7 +31,7 @@ import java.lang.reflect.Type; public abstract class VersionedMessageDeserializer<T> implements MessageDeserializer<T> { public static final String VERSION_MISMATCH_MSG = - "Notification message version mismatch. Expected %s but recieved %s"; + "Notification message version mismatch. Expected %s but recieved %s. Message %s"; private final Type versionedMessageType; private final MessageVersion expectedVersion; @@ -90,18 +90,16 @@ public abstract class VersionedMessageDeserializer<T> implements MessageDeserial // message has newer version if (comp > 0) { - String msg = String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion()); + String msg = + String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(), messageJson); notificationLogger.error(msg); - notificationLogger.info(messageJson); throw new IncompatibleVersionException(msg); } // message has older version if (comp < 0) { - notificationLogger.info( - String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion())); - - notificationLogger.info(messageJson); + notificationLogger.info(String.format(VERSION_MISMATCH_MSG, expectedVersion, versionedMessage.getVersion(), + messageJson)); } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java deleted file mode 100644 index 300cbb5..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.notification.entity; - -import com.google.inject.Inject; -import org.apache.atlas.AtlasException; -import org.apache.atlas.listener.EntityChangeListener; -import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.ITypedReferenceableInstance; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.types.TypeSystem; - -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; - -/** - * Listen to the repository for entity changes and produce entity change notifications. - */ -public class NotificationEntityChangeListener implements EntityChangeListener { - - private final NotificationInterface notificationInterface; - private final TypeSystem typeSystem; - - - // ----- Constructors ------------------------------------------------------ - - /** - * Construct a NotificationEntityChangeListener. - * - * @param notificationInterface the notification framework interface - * @param typeSystem the Atlas type system - */ - @Inject - public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) { - this.notificationInterface = notificationInterface; - this.typeSystem = typeSystem; - } - - - // ----- EntityChangeListener ---------------------------------------------- - - @Override - public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException { - notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_CREATE); - } - - @Override - public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException { - notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_UPDATE); - } - - @Override - public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { - notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD); - } - - @Override - public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException { - notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE); - } - - @Override - public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException { - notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_DELETE); - } - - - // ----- helper methods ------------------------------------------------- - - - // send notification of entity change - private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions, - EntityNotification.OperationType operationType) throws AtlasException { - List<EntityNotification> messages = new LinkedList<>(); - - for (IReferenceableInstance entityDefinition : entityDefinitions) { - Referenceable entity = new Referenceable(entityDefinition); - - EntityNotificationImpl notification = - new EntityNotificationImpl(entity, operationType, typeSystem); - - messages.add(notification); - } - - notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index e8b55ef..0c8990f 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -27,9 +27,13 @@ import java.lang.reflect.Type; import java.util.LinkedList; import java.util.List; +import static org.mockito.Matchers.endsWith; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; /** * AbstractNotificationConsumer tests. @@ -110,17 +114,17 @@ public class AbstractNotificationConsumerTest { assertTrue(consumer.hasNext()); assertEquals(new TestMessage("sValue2", 98), consumer.next()); - verify(logger).info(json2); + verify(logger).info(endsWith(json2)); assertTrue(consumer.hasNext()); assertEquals(new TestMessage("sValue3", 97), consumer.next()); - verify(logger).info(json3); + verify(logger).info(endsWith(json3)); assertTrue(consumer.hasNext()); assertEquals(new TestMessage("sValue4", 96), consumer.next()); - verify(logger).info(json4); + verify(logger).info(endsWith(json4)); assertFalse(consumer.hasNext()); } @@ -154,7 +158,7 @@ public class AbstractNotificationConsumerTest { consumer.next(); fail("Expected VersionMismatchException!"); } catch (IncompatibleVersionException e) { - verify(logger).info(json2); + verify(logger).error(endsWith(json2)); } assertFalse(consumer.hasNext()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index fd17292..0402b49 100644 --- a/release-log.txt +++ b/release-log.txt @@ -3,6 +3,7 @@ Apache Atlas Release Notes --trunk - unreleased INCOMPATIBLE CHANGES: +ATLAS-716 Entity update/delete notifications (shwethags) ATLAS-619 Canonicalize hive queries (sumasai) ATLAS-497 Simple Authorization (saqeeb.s via yhemanth) ATLAS-661 REST API Authentication (nixonrodrigues via yhemanth) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java index 540c308..43e9f85 100755 --- a/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/MetadataRepository.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository; +import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.ITypedStruct; @@ -26,7 +27,6 @@ import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.exception.TraitNotFoundException; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.IDataType; -import org.apache.atlas.typesystem.types.TypeUtils; import java.util.List; @@ -111,7 +111,7 @@ public interface MetadataRepository { * @return guids of deleted entities * @throws RepositoryException */ - TypeUtils.Pair<List<String>, List<ITypedReferenceableInstance>> deleteEntities(List<String> guids) throws RepositoryException; + AtlasClient.EntityResult deleteEntities(List<String> guids) throws RepositoryException; // Trait management functions @@ -147,13 +147,13 @@ public interface MetadataRepository { * Adds/Updates the property to the entity that corresponds to the GUID * Supports only primitive attribute/Class Id updations. */ - TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException; + AtlasClient.EntityResult updatePartial(ITypedReferenceableInstance entity) throws RepositoryException; /** * Adds the property to the entity that corresponds to the GUID * @param entitiesToBeUpdated The entities to be updated */ - TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException; + AtlasClient.EntityResult updateEntities(ITypedReferenceableInstance... entitiesToBeUpdated) throws RepositoryException; /** * Returns the entity for the given type and qualified name http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 5b4bdbf..958ecaf 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -55,8 +55,9 @@ public class EntityAuditListener implements EntityChangeListener { } private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, - EntityAuditEvent.EntityAuditAction action, String details) { - return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details); + EntityAuditEvent.EntityAuditAction action, String details) + throws AtlasException { + return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details, entity); } @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index 8f11322..22d71df 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -78,6 +78,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository public static final byte[] COLUMN_ACTION = Bytes.toBytes("action"); public static final byte[] COLUMN_DETAIL = Bytes.toBytes("detail"); public static final byte[] COLUMN_USER = Bytes.toBytes("user"); + public static final byte[] COLUMN_DEFINITION = Bytes.toBytes("def"); private TableName tableName; private Connection connection; @@ -110,6 +111,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository addColumn(put, COLUMN_ACTION, event.getAction()); addColumn(put, COLUMN_USER, event.getUser()); addColumn(put, COLUMN_DETAIL, event.getDetails()); + addColumn(put, COLUMN_DEFINITION, event.getEntityDefinitionString()); puts.add(put); } table.put(puts); @@ -183,6 +185,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository event.setUser(getResultString(result, COLUMN_USER)); event.setAction(EntityAuditEvent.EntityAuditAction.valueOf(getResultString(result, COLUMN_ACTION))); event.setDetails(getResultString(result, COLUMN_DETAIL)); + event.setEntityDefinition(getResultString(result, COLUMN_DEFINITION)); events.add(event); } LOG.info("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java index a9e4f39..91f9bd0 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java @@ -47,15 +47,16 @@ import static org.apache.atlas.repository.graph.GraphHelper.string; public abstract class DeleteHandler { public static final Logger LOG = LoggerFactory.getLogger(DeleteHandler.class); - private static final GraphHelper graphHelper = GraphHelper.getInstance(); + protected static final GraphHelper graphHelper = GraphHelper.getInstance(); protected TypeSystem typeSystem; private boolean shouldUpdateReverseAttribute; + private boolean softDelete; - public DeleteHandler(TypeSystem typeSystem, boolean shouldUpdateReverseAttribute) { + public DeleteHandler(TypeSystem typeSystem, boolean shouldUpdateReverseAttribute, boolean softDelete) { this.typeSystem = typeSystem; this.shouldUpdateReverseAttribute = shouldUpdateReverseAttribute; - + this.softDelete = softDelete; } /** @@ -64,16 +65,22 @@ public abstract class DeleteHandler { * @throws AtlasException */ public void deleteEntity(Vertex instanceVertex) throws AtlasException { + RequestContext requestContext = RequestContext.get(); String guid = GraphHelper.getIdFromVertex(instanceVertex); + Id.EntityState state = GraphHelper.getState(instanceVertex); + if (requestContext.getDeletedEntityIds().contains(guid) || state == Id.EntityState.DELETED) { + LOG.debug("Skipping deleting {} as its already deleted", guid); + return; + } String typeName = GraphHelper.getTypeName(instanceVertex); - RequestContext.get().recordDeletedEntity(guid, typeName); + requestContext.recordEntityDelete(guid, typeName); deleteAllTraits(instanceVertex); - deleteTypeVertex(instanceVertex); + deleteTypeVertex(instanceVertex, false); } - protected abstract void deleteEdge(Edge edge) throws AtlasException; + protected abstract void deleteEdge(Edge edge, boolean force) throws AtlasException; /** * Deletes a type vertex - can be entity(class type) or just vertex(struct/trait type) @@ -81,11 +88,11 @@ public abstract class DeleteHandler { * @param typeCategory * @throws AtlasException */ - protected void deleteTypeVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory) throws AtlasException { + protected void deleteTypeVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory, boolean force) throws AtlasException { switch (typeCategory) { case STRUCT: case TRAIT: - deleteTypeVertex(instanceVertex); + deleteTypeVertex(instanceVertex, force); break; case CLASS: @@ -102,7 +109,7 @@ public abstract class DeleteHandler { * @param instanceVertex * @throws AtlasException */ - protected void deleteTypeVertex(Vertex instanceVertex) throws AtlasException { + protected void deleteTypeVertex(Vertex instanceVertex, boolean force) throws AtlasException { LOG.debug("Deleting {}", string(instanceVertex)); String typeName = GraphHelper.getTypeName(instanceVertex); IDataType type = typeSystem.getDataType(IDataType.class, typeName); @@ -115,12 +122,12 @@ public abstract class DeleteHandler { switch (attributeInfo.dataType().getTypeCategory()) { case CLASS: //If its class attribute, delete the reference - deleteReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.CLASS, attributeInfo.isComposite); + deleteEdgeReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.CLASS, attributeInfo.isComposite); break; case STRUCT: //If its struct attribute, delete the reference - deleteReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.STRUCT); + deleteEdgeReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.STRUCT, false); break; case ARRAY: @@ -133,7 +140,7 @@ public abstract class DeleteHandler { if (edges != null) { while (edges.hasNext()) { Edge edge = edges.next(); - deleteReference(edge, elementType, attributeInfo); + deleteEdgeReference(edge, elementType.getTypeCategory(), attributeInfo.isComposite, false); } } } @@ -151,22 +158,31 @@ public abstract class DeleteHandler { if (keys != null) { for (String key : keys) { String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, key); - deleteReference(instanceVertex, mapEdgeLabel, valueTypeCategory, attributeInfo.isComposite); + deleteEdgeReference(instanceVertex, mapEdgeLabel, valueTypeCategory, attributeInfo.isComposite); } } } } } - deleteVertex(instanceVertex, type.getTypeCategory()); + deleteVertex(instanceVertex, force); } - public void deleteReference(Edge edge, IDataType dataType, AttributeInfo attributeInfo) throws AtlasException { - deleteReference(edge, dataType.getTypeCategory(), attributeInfo.isComposite); - } - - public void deleteReference(Edge edge, DataTypes.TypeCategory typeCategory, boolean isComposite) throws AtlasException { + /** + * Force delete is used to remove struct/trait in case of entity updates + * @param edge + * @param typeCategory + * @param isComposite + * @param forceDeleteStructTrait + * @return returns true if the edge reference is hard deleted + * @throws AtlasException + */ + public boolean deleteEdgeReference(Edge edge, DataTypes.TypeCategory typeCategory, boolean isComposite, + boolean forceDeleteStructTrait) throws AtlasException { LOG.debug("Deleting {}", string(edge)); + boolean forceDelete = + (typeCategory == DataTypes.TypeCategory.STRUCT || typeCategory == DataTypes.TypeCategory.TRAIT) + ? forceDeleteStructTrait : false; if (typeCategory == DataTypes.TypeCategory.STRUCT || typeCategory == DataTypes.TypeCategory.TRAIT || (typeCategory == DataTypes.TypeCategory.CLASS && isComposite)) { //If the vertex is of type struct/trait, delete the edge and then the reference vertex as the vertex is not shared by any other entities. @@ -175,32 +191,28 @@ public abstract class DeleteHandler { Vertex vertexForDelete = edge.getVertex(Direction.IN); //If deleting the edge and then the in vertex, reverse attribute shouldn't be updated - deleteEdge(edge, false); - deleteTypeVertex(vertexForDelete, typeCategory); + deleteEdge(edge, false, forceDelete); + deleteTypeVertex(vertexForDelete, typeCategory, forceDelete); } else { //If the vertex is of type class, and its not a composite attributes, the reference vertex' lifecycle is not controlled //through this delete. Hence just remove the reference edge. Leave the reference vertex as is //If deleting just the edge, reverse attribute should be updated for any references //For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated - deleteEdge(edge, true); + deleteEdge(edge, true, false); } + return !softDelete || forceDelete; } - public void deleteReference(Vertex instanceVertex, String edgeLabel, DataTypes.TypeCategory typeCategory) - throws AtlasException { - deleteReference(instanceVertex, edgeLabel, typeCategory, false); - } - - public void deleteReference(Vertex instanceVertex, String edgeLabel, DataTypes.TypeCategory typeCategory, - boolean isComposite) throws AtlasException { - Edge edge = GraphHelper.getEdgeForLabel(instanceVertex, edgeLabel); + public void deleteEdgeReference(Vertex outVertex, String edgeLabel, DataTypes.TypeCategory typeCategory, + boolean isComposite) throws AtlasException { + Edge edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel); if (edge != null) { - deleteReference(edge, typeCategory, isComposite); + deleteEdgeReference(edge, typeCategory, isComposite, false); } } - protected void deleteEdge(Edge edge, boolean updateReverseAttribute) throws AtlasException { + protected void deleteEdge(Edge edge, boolean updateReverseAttribute, boolean force) throws AtlasException { //update reverse attribute if (updateReverseAttribute) { AttributeInfo attributeInfo = getAttributeForEdge(edge.getLabel()); @@ -210,28 +222,28 @@ public abstract class DeleteHandler { } } - deleteEdge(edge); + deleteEdge(edge, force); } - protected void deleteVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory) throws AtlasException { + protected void deleteVertex(Vertex instanceVertex, boolean force) throws AtlasException { //Update external references(incoming edges) to this vertex LOG.debug("Setting the external references to {} to null(removing edges)", string(instanceVertex)); Iterator<Edge> edges = instanceVertex.getEdges(Direction.IN).iterator(); while(edges.hasNext()) { Edge edge = edges.next(); - String edgeState = edge.getProperty(Constants.STATE_PROPERTY_KEY); - if (Id.EntityState.ACTIVE.name().equals(edgeState)) { + Id.EntityState edgeState = GraphHelper.getState(edge); + if (edgeState == Id.EntityState.ACTIVE) { //Delete only the active edge references AttributeInfo attribute = getAttributeForEdge(edge.getLabel()); + //TODO use delete edge instead?? deleteEdgeBetweenVertices(edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), attribute.name); - deleteEdge(edge); } } - _deleteVertex(instanceVertex); + _deleteVertex(instanceVertex, force); } - protected abstract void _deleteVertex(Vertex instanceVertex); + protected abstract void _deleteVertex(Vertex instanceVertex, boolean force); /** * Deletes the edge between outvertex and inVertex. The edge is for attribute attributeName of outVertex @@ -245,7 +257,8 @@ public abstract class DeleteHandler { attributeName); String typeName = GraphHelper.getTypeName(outVertex); String outId = GraphHelper.getIdFromVertex(outVertex); - if (outId != null && RequestContext.get().isDeletedEntity(outId)) { + Id.EntityState state = GraphHelper.getState(outVertex); + if ((outId != null && RequestContext.get().isDeletedEntity(outId)) || state == Id.EntityState.DELETED) { //If the reference vertex is marked for deletion, skip updating the reference return; } @@ -261,8 +274,10 @@ public abstract class DeleteHandler { //If its class attribute, its the only edge between two vertices if (attributeInfo.multiplicity.nullAllowed()) { edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel); - } - else { + if (shouldUpdateReverseAttribute) { + GraphHelper.setProperty(outVertex, propertyName, null); + } + } else { // Cannot unset a required attribute. throw new NullRequiredAttributeException("Cannot unset required attribute " + GraphHelper.getQualifiedFieldName(type, attributeName) + " on " + string(outVertex) + " edge = " + edgeLabel); @@ -275,23 +290,26 @@ public abstract class DeleteHandler { if (elements != null) { elements = new ArrayList<>(elements); //Make a copy, else list.remove reflects on titan.getProperty() for (String elementEdgeId : elements) { - Edge elementEdge = graphHelper.getEdgeById(elementEdgeId); + Edge elementEdge = graphHelper.getEdgeByEdgeId(outVertex, edgeLabel, elementEdgeId); if (elementEdge == null) { continue; } Vertex elementVertex = elementEdge.getVertex(Direction.IN); if (elementVertex.getId().toString().equals(inVertex.getId().toString())) { - if (attributeInfo.multiplicity.nullAllowed() || elements.size() > attributeInfo.multiplicity.lower) { - edge = elementEdge; - } - else { + edge = elementEdge; + + //TODO element.size includes deleted items as well. should exclude + if (!attributeInfo.multiplicity.nullAllowed() + && elements.size() <= attributeInfo.multiplicity.lower) { // Deleting this edge would violate the attribute's lower bound. throw new NullRequiredAttributeException( - "Cannot remove array element from required attribute " + - GraphHelper.getQualifiedFieldName(type, attributeName) + " on " + string(outVertex) + " " + string(elementEdge)); + "Cannot remove array element from required attribute " + + GraphHelper.getQualifiedFieldName(type, attributeName) + " on " + + string(outVertex) + " " + string(elementEdge)); } - if (shouldUpdateReverseAttribute || attributeInfo.isComposite) { + + if (shouldUpdateReverseAttribute) { //if composite attribute, remove the reference as well. else, just remove the edge //for example, when table is deleted, process still references the table //but when column is deleted, table will not reference the deleted column @@ -299,8 +317,9 @@ public abstract class DeleteHandler { attributeName); elements.remove(elementEdge.getId().toString()); GraphHelper.setProperty(outVertex, propertyName, elements); + break; + } - break; } } } @@ -312,11 +331,12 @@ public abstract class DeleteHandler { if (keys != null) { keys = new ArrayList<>(keys); //Make a copy, else list.remove reflects on titan.getProperty() for (String key : keys) { - String keyPropertyName = propertyName + "." + key; + String keyPropertyName = GraphHelper.getQualifiedNameForMapKey(propertyName, key); String mapEdgeId = outVertex.getProperty(keyPropertyName); - Edge mapEdge = graphHelper.getEdgeById(mapEdgeId); + Edge mapEdge = graphHelper.getEdgeByEdgeId(outVertex, keyPropertyName, mapEdgeId); Vertex mapVertex = mapEdge.getVertex(Direction.IN); if (mapVertex.getId().toString().equals(inVertex.getId().toString())) { + //TODO keys.size includes deleted items as well. should exclude if (attributeInfo.multiplicity.nullAllowed() || keys.size() > attributeInfo.multiplicity.lower) { edge = mapEdge; } @@ -327,7 +347,7 @@ public abstract class DeleteHandler { GraphHelper.getQualifiedFieldName(type, attributeName) + " on " + string(outVertex) + " " + string(mapEdge)); } - if (shouldUpdateReverseAttribute || attributeInfo.isComposite) { + if (shouldUpdateReverseAttribute) { //remove this key LOG.debug("Removing edge {}, key {} from the map attribute {}", string(mapEdge), key, attributeName); @@ -351,9 +371,11 @@ public abstract class DeleteHandler { } if (edge != null) { - deleteEdge(edge); + deleteEdge(edge, false); + RequestContext requestContext = RequestContext.get(); GraphHelper.setProperty(outVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, - RequestContext.get().getRequestTime()); + requestContext.getRequestTime()); + requestContext.recordEntityUpdate(outId); } } @@ -389,7 +411,7 @@ public abstract class DeleteHandler { for (String traitNameToBeDeleted : traitNames) { String relationshipLabel = GraphHelper.getTraitLabel(typeName, traitNameToBeDeleted); - deleteReference(instanceVertex, relationshipLabel, DataTypes.TypeCategory.TRAIT); + deleteEdgeReference(instanceVertex, relationshipLabel, DataTypes.TypeCategory.TRAIT, false); } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java index 3604277..0d82d90 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java @@ -22,8 +22,10 @@ import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; import com.thinkaurelius.titan.core.TitanGraph; +import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.GraphQuery; import com.tinkerpop.blueprints.Vertex; +import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.GraphTransaction; import org.apache.atlas.RequestContext; @@ -40,7 +42,6 @@ import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.DataTypes; import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.TypeSystem; -import org.apache.atlas.typesystem.types.TypeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -258,8 +259,8 @@ public class GraphBackedMetadataRepository implements MetadataRepository { try { final String entityTypeName = GraphHelper.getTypeName(instanceVertex); String relationshipLabel = GraphHelper.getTraitLabel(entityTypeName, traitNameToBeDeleted); - - deleteHandler.deleteReference(instanceVertex, relationshipLabel, DataTypes.TypeCategory.TRAIT); + Edge edge = GraphHelper.getEdgeForLabel(instanceVertex, relationshipLabel); + deleteHandler.deleteEdgeReference(edge, DataTypes.TypeCategory.TRAIT, false, true); // update the traits in entity once trait removal is successful traitNames.remove(traitNameToBeDeleted); @@ -284,14 +285,15 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction - public TypeUtils.Pair<List<String>, List<String>> updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException { + public AtlasClient.EntityResult updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException { LOG.info("updating entity {}", entitiesUpdated); try { TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler); instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_FULL, entitiesUpdated); RequestContext requestContext = RequestContext.get(); - return TypeUtils.Pair.of(requestContext.getCreatedEntityIds(), requestContext.getUpdatedEntityIds()); + return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(), + requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds()); } catch (AtlasException e) { throw new RepositoryException(e); } @@ -299,13 +301,14 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction - public TypeUtils.Pair<List<String>, List<String>> updatePartial(ITypedReferenceableInstance entity) throws RepositoryException { + public AtlasClient.EntityResult updatePartial(ITypedReferenceableInstance entity) throws RepositoryException { LOG.info("updating entity {}", entity); try { TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler); instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity); RequestContext requestContext = RequestContext.get(); - return TypeUtils.Pair.of(requestContext.getCreatedEntityIds(), requestContext.getUpdatedEntityIds()); + return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(), + requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds()); } catch (AtlasException e) { throw new RepositoryException(e); } @@ -313,7 +316,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction - public TypeUtils.Pair<List<String>, List<ITypedReferenceableInstance>> deleteEntities(List<String> guids) throws RepositoryException { + public AtlasClient.EntityResult deleteEntities(List<String> guids) throws RepositoryException { if (guids == null || guids.size() == 0) { throw new IllegalArgumentException("guids must be non-null and non-empty"); @@ -337,6 +340,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { } } RequestContext requestContext = RequestContext.get(); - return new TypeUtils.Pair<>(requestContext.getDeletedEntityIds(), requestContext.getDeletedEntities()); + return new AtlasClient.EntityResult(requestContext.getCreatedEntityIds(), + requestContext.getUpdatedEntityIds(), requestContext.getDeletedEntityIds()); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/705014eb/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java index cccafc2..4f6d011 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java @@ -107,11 +107,13 @@ public final class GraphHelper { // add timestamp information setProperty(vertexWithoutIdentity, Constants.TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); + setProperty(vertexWithoutIdentity, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, + RequestContext.get().getRequestTime()); return vertexWithoutIdentity; } - public Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) { + private Edge addEdge(Vertex fromVertex, Vertex toVertex, String edgeLabel) { LOG.debug("Adding edge for {} -> label {} -> {}", string(fromVertex), edgeLabel, string(toVertex)); Edge edge = titanGraph.addEdge(null, fromVertex, toVertex, edgeLabel); @@ -127,12 +129,34 @@ public final class GraphHelper { Iterable<Edge> edges = inVertex.getEdges(Direction.IN, edgeLabel); for (Edge edge : edges) { if (edge.getVertex(Direction.OUT).getId().toString().equals(outVertex.getId().toString())) { - return edge; + Id.EntityState edgeState = getState(edge); + if (edgeState == null || edgeState == Id.EntityState.ACTIVE) { + return edge; + } } } return addEdge(outVertex, inVertex, edgeLabel); } + + public Edge getEdgeByEdgeId(Vertex outVertex, String edgeLabel, String edgeId) { + if (edgeId == null) { + return null; + } + return titanGraph.getEdge(edgeId); + + //TODO get edge id is expensive. Use this logic. But doesn't work for now + /** + Iterable<Edge> edges = outVertex.getEdges(Direction.OUT, edgeLabel); + for (Edge edge : edges) { + if (edge.getId().toString().equals(edgeId)) { + return edge; + } + } + return null; + **/ + } + /** * Args of the format prop1, key1, prop2, key2... * Searches for a vertex with prop1=key1 && prop2=key2 @@ -180,15 +204,14 @@ public final class GraphHelper { * @return */ public static Edge getEdgeForLabel(Vertex vertex, String edgeLabel) { - String vertexState = vertex.getProperty(Constants.STATE_PROPERTY_KEY); - Iterator<Edge> iterator = GraphHelper.getOutGoingEdgesByLabel(vertex, edgeLabel); Edge latestDeletedEdge = null; long latestDeletedEdgeTime = Long.MIN_VALUE; + while (iterator != null && iterator.hasNext()) { Edge edge = iterator.next(); - String edgeState = edge.getProperty(Constants.STATE_PROPERTY_KEY); - if (edgeState == null || Id.EntityState.ACTIVE.name().equals(edgeState)) { + Id.EntityState edgeState = getState(edge); + if (edgeState == null || edgeState == Id.EntityState.ACTIVE) { LOG.debug("Found {}", string(edge)); return edge; } else { @@ -201,19 +224,8 @@ public final class GraphHelper { } //If the vertex is deleted, return latest deleted edge - if (Id.EntityState.DELETED.equals(vertexState)) { - LOG.debug("Found {}", string(latestDeletedEdge)); - return latestDeletedEdge; - } - - return null; - } - - public Edge getEdgeById(String edgeId) { - if(edgeId != null) { - return titanGraph.getEdge(edgeId); - } - return null; + LOG.debug("Found {}", latestDeletedEdge == null ? "null" : string(latestDeletedEdge)); + return latestDeletedEdge; } public static String vertexString(final Vertex vertex) { @@ -343,6 +355,15 @@ public final class GraphHelper { return instanceVertex.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY); } + public static Id.EntityState getState(Element element) { + String state = getStateAsString(element); + return state == null ? null : Id.EntityState.valueOf(state); + } + + public static String getStateAsString(Element element) { + return element.getProperty(Constants.STATE_PROPERTY_KEY); + } + /** * For the given type, finds an unique attribute and checks if there is an existing instance with the same * unique value
