ATLAS-540 API to retrieve entity version events (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/c2356f8e Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/c2356f8e Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/c2356f8e Branch: refs/heads/master Commit: c2356f8efdbb8a14ee79d773bacc23b7eb743615 Parents: 85afbef Author: Shwetha GS <[email protected]> Authored: Mon Apr 11 16:45:52 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Mon Apr 11 17:03:33 2016 +0530 ---------------------------------------------------------------------- addons/falcon-bridge/pom.xml | 3 +- .../apache/atlas/falcon/hook/FalconHookIT.java | 7 +- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 55 +++----- .../org/apache/atlas/hive/hook/HiveHook.java | 3 +- .../org/apache/atlas/hive/hook/HiveHookIT.java | 2 +- .../apache/atlas/sqoop/hook/SqoopHookIT.java | 16 +-- .../main/java/org/apache/atlas/AtlasClient.java | 88 +++++++++++-- .../java/org/apache/atlas/EntityAuditEvent.java | 132 +++++++++++++++++++ .../org/apache/atlas/utils/ParamChecker.java | 15 +++ distro/pom.xml | 2 + distro/src/conf/atlas-application.properties | 5 + .../java/org/apache/atlas/hook/AtlasHook.java | 3 - .../atlas/notification/NotificationModule.java | 15 +-- pom.xml | 3 +- release-log.txt | 1 + repository/pom.xml | 5 + .../apache/atlas/RepositoryMetadataModule.java | 9 +- .../repository/audit/EntityAuditListener.java | 39 +++--- .../repository/audit/EntityAuditRepository.java | 82 +----------- .../audit/HBaseBasedAuditRepository.java | 79 +++++++---- .../audit/InMemoryEntityAuditRepository.java | 15 ++- .../atlas/services/DefaultMetadataService.java | 56 +++++--- .../audit/AuditRepositoryTestBase.java | 58 +++++--- .../service/DefaultMetadataServiceTest.java | 104 ++++++++++++--- server-api/pom.xml | 2 + .../apache/atlas/services/MetadataService.java | 10 ++ .../atlas/web/listeners/GuiceServletConfig.java | 3 +- .../atlas/web/resources/EntityResource.java | 61 +++++++-- .../atlas/web/resources/TypesResource.java | 5 +- .../service/ActiveInstanceElectorModule.java | 4 +- .../apache/atlas/web/service/ServiceModule.java | 41 ++++++ .../web/resources/EntityJerseyResourceIT.java | 16 +++ 32 files changed, 647 insertions(+), 292 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/addons/falcon-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml index 03cb11e..9b07c9f 100644 --- a/addons/falcon-bridge/pom.xml +++ b/addons/falcon-bridge/pom.xml @@ -205,8 +205,7 @@ <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-maven-plugin</artifactId> <configuration> - <!--<skip>${skipTests}</skip>--> - <!--only skip int tests --> + <skip>${skipTests}</skip> <httpConnector> <port>31000</port> <idleTimeout>60000</idleTimeout> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java index 4249a8f..4e2a06f 100644 --- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java +++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java @@ -38,7 +38,6 @@ import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.security.CurrentUser; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.security.UserGroupInformation; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; @@ -63,11 +62,10 @@ public class FalconHookIT { private AtlasClient atlasClient; private static final ConfigurationStore STORE = ConfigurationStore.get(); - private Configuration atlasProperties; @BeforeClass public void setUp() throws Exception { - atlasProperties = ApplicationProperties.get(); + Configuration atlasProperties = ApplicationProperties.get(); atlasClient = new AtlasClient(atlasProperties.getString("atlas.rest.address")); AtlasService service = new AtlasService(); @@ -83,8 +81,7 @@ public class FalconHookIT { return; } - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties, - UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser()); + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient); hiveMetaStoreBridge.registerHiveDataModel(); FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 6b348e2..f007a32 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -18,7 +18,6 @@ package org.apache.atlas.hive.bridge; -import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; @@ -33,7 +32,6 @@ import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.commons.configuration.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -66,32 +64,19 @@ public class HiveMetaStoreBridge { public static final String TABLE_TYPE_ATTR = "tableType"; public static final String SEARCH_ENTRY_GUID_ATTR = "__guid"; public static final String LAST_ACCESS_TIME_ATTR = "lastAccessTime"; - private final String clusterName; public static final String ATLAS_ENDPOINT = "atlas.rest.address"; - private final String doAsUser; - private final UserGroupInformation ugi; - private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class); public final Hive hiveClient; - private final AtlasClient atlasClient; - - /** - * Construct a HiveMetaStoreBridge. - * @param hiveConf {@link HiveConf} for Hive component in the cluster - * @param atlasConf {@link Configuration} for Atlas component in the cluster - * @throws Exception - */ - public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf) throws Exception { - this(hiveConf, atlasConf, null, null); - } + private AtlasClient atlasClient = null; - @VisibleForTesting HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) { - this(clusterName, hiveClient, atlasClient, null, null); + this.clusterName = clusterName; + this.hiveClient = hiveClient; + this.atlasClient = atlasClient; } public String getClusterName() { @@ -101,26 +86,20 @@ public class HiveMetaStoreBridge { /** * Construct a HiveMetaStoreBridge. * @param hiveConf {@link HiveConf} for Hive component in the cluster - * @param doAsUser The user accessing Atlas service - * @param ugi {@link UserGroupInformation} representing the Atlas service */ - public HiveMetaStoreBridge(HiveConf hiveConf, Configuration atlasConf, String doAsUser, - UserGroupInformation ugi) throws Exception { - this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), - Hive.get(hiveConf), - new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser), doAsUser, ugi); + public HiveMetaStoreBridge(HiveConf hiveConf) throws Exception { + this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), null); } - @VisibleForTesting - HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient, String user, UserGroupInformation ugi) { - this.clusterName = clusterName; - this.hiveClient = hiveClient; - this.atlasClient = atlasClient; - this.doAsUser = user; - this.ugi = ugi; + /** + * Construct a HiveMetaStoreBridge. + * @param hiveConf {@link HiveConf} for Hive component in the cluster + */ + public HiveMetaStoreBridge(HiveConf hiveConf, AtlasClient atlasClient) throws Exception { + this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), atlasClient); } - private AtlasClient getAtlasClient() { + AtlasClient getAtlasClient() { return atlasClient; } @@ -200,7 +179,7 @@ public class HiveMetaStoreBridge { String entityJSON = InstanceSerialization.toJson(referenceable, true); LOG.debug("Submitting new entity {} = {}", referenceable.getTypeName(), entityJSON); - JSONArray guids = atlasClient.createEntity(entityJSON); + JSONArray guids = getAtlasClient().createEntity(entityJSON); LOG.debug("created instance for type " + typeName + ", guid: " + guids); return new Referenceable(guids.getString(0), referenceable.getTypeName(), null); @@ -539,7 +518,11 @@ public class HiveMetaStoreBridge { public static void main(String[] argv) throws Exception { Configuration atlasConf = ApplicationProperties.get(); - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf); + String atlasEndpoint = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + AtlasClient atlasClient = new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName()); + + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient); hiveMetaStoreBridge.registerHiveDataModel(); hiveMetaStoreBridge.importHiveMetadata(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index b947a8c..a0f1d40 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.HiveOperation; - import org.apache.hadoop.security.UserGroupInformation; import org.json.JSONObject; import org.slf4j.Logger; @@ -290,7 +289,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { LOG.info("Entered Atlas hook for hook type {} operation {}", event.getHookType(), event.getOperation()); - HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf, atlasProperties, event.getUser(), event.getUgi()); + HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf); switch (event.getOperation()) { case CREATEDATABASE: http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index 2f0c71f..dbba926 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -89,7 +89,7 @@ public class HiveHookIT { Configuration configuration = ApplicationProperties.get(); dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL)); - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, configuration); + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, dgiCLient); hiveMetaStoreBridge.registerHiveDataModel(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java index 0e4658a..2820169 100644 --- a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java +++ b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java @@ -28,7 +28,6 @@ import org.apache.atlas.sqoop.model.SqoopDataModelGenerator; import org.apache.atlas.sqoop.model.SqoopDataTypes; import org.apache.commons.configuration.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.sqoop.SqoopJobDataPublisher; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONObject; @@ -43,20 +42,19 @@ public class SqoopHookIT { private static final String CLUSTER_NAME = "primary"; public static final String DEFAULT_DB = "default"; private static final int MAX_WAIT_TIME = 2000; - private AtlasClient dgiCLient; + private AtlasClient atlasClient; @BeforeClass public void setUp() throws Exception { //Set-up sqoop session Configuration configuration = ApplicationProperties.get(); - dgiCLient = new AtlasClient(configuration.getString("atlas.rest.address")); - registerDataModels(dgiCLient, configuration); + atlasClient = new AtlasClient(configuration.getString("atlas.rest.address")); + registerDataModels(atlasClient); } - private void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception { + private void registerDataModels(AtlasClient client) throws Exception { // Make sure hive model exists - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf, - UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser()); + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient); hiveMetaStoreBridge.registerHiveDataModel(); SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator(); @@ -118,12 +116,12 @@ public class SqoopHookIT { waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { - JSONArray results = dgiCLient.search(query); + JSONArray results = atlasClient.search(query); return results.length() > 0; } }); - JSONArray results = dgiCLient.search(query); + JSONArray results = atlasClient.search(query); JSONObject row = results.getJSONObject(0).getJSONObject("t"); return row.getString("id"); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index 938a0a3..cc87706 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -71,11 +71,16 @@ public class AtlasClient { public static final String ROWS = "rows"; public static final String DATATYPE = "dataType"; + public static final String EVENTS = "events"; + public static final String START_KEY = "startKey"; + public static final String NUM_RESULTS = "count"; + public static final String BASE_URI = "api/atlas/"; public static final String ADMIN_VERSION = "admin/version"; public static final String ADMIN_STATUS = "admin/status"; public static final String TYPES = "types"; public static final String URI_ENTITY = "entities"; + public static final String URI_ENTITY_AUDIT = "audit"; public static final String URI_SEARCH = "discovery/search"; public static final String URI_LINEAGE = "lineage/hive/table"; @@ -351,6 +356,9 @@ public class AtlasClient { DELETE_ENTITIES(BASE_URI + URI_ENTITY, HttpMethod.DELETE, Response.Status.OK), DELETE_ENTITY(BASE_URI + URI_ENTITY, HttpMethod.DELETE, Response.Status.OK), + //audit operation + LIST_ENTITY_AUDIT(BASE_URI + URI_ENTITY, HttpMethod.GET, Response.Status.OK), + //Trait operations ADD_TRAITS(BASE_URI + URI_ENTITY, HttpMethod.POST, Response.Status.CREATED), DELETE_TRAITS(BASE_URI + URI_ENTITY, HttpMethod.DELETE, Response.Status.OK), @@ -396,7 +404,12 @@ public class AtlasClient { */ public List<String> createType(String typeAsJson) throws AtlasServiceException { JSONObject response = callAPI(API.CREATE_TYPE, typeAsJson); - return extractResults(response, AtlasClient.TYPES); + return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() { + @Override + String extractElement(JSONObject element) throws JSONException { + return element.getString(AtlasClient.NAME); + } + }); } /** @@ -417,7 +430,12 @@ public class AtlasClient { */ public List<String> updateType(String typeAsJson) throws AtlasServiceException { JSONObject response = callAPI(API.UPDATE_TYPE, typeAsJson); - return extractResults(response, AtlasClient.TYPES); + return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() { + @Override + String extractElement(JSONObject element) throws JSONException { + return element.getString(AtlasClient.NAME); + } + }); } /** @@ -432,7 +450,7 @@ public class AtlasClient { public List<String> listTypes() throws AtlasServiceException { final JSONObject jsonObject = callAPI(API.LIST_TYPES, null); - return extractResults(jsonObject, AtlasClient.RESULTS); + return extractResults(jsonObject, AtlasClient.RESULTS, new ExtractOperation<String, String>()); } public String getType(String typeName) throws AtlasServiceException { @@ -611,7 +629,7 @@ public class AtlasClient { return resource; } }); - return extractResults(jsonResponse, GUID); + return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>()); } /** @@ -621,14 +639,15 @@ public class AtlasClient { * @param uniqueAttributeValue Attribute Value that uniquely identifies the entity * @return List of deleted entity guids(including composite references from that entity) */ - public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue) throws AtlasServiceException { + public List<String> deleteEntity(String entityType, String uniqueAttributeName, String uniqueAttributeValue) + throws AtlasServiceException { API api = API.DELETE_ENTITY; WebResource resource = getResource(api); resource = resource.queryParam(TYPE, entityType); resource = resource.queryParam(ATTRIBUTE_NAME, uniqueAttributeName); resource = resource.queryParam(ATTRIBUTE_VALUE, uniqueAttributeValue); JSONObject jsonResponse = callAPIWithResource(API.DELETE_ENTITIES, resource, null); - return extractResults(jsonResponse, GUID); + return extractResults(jsonResponse, GUID, new ExtractOperation<String, String>()); } /** @@ -698,20 +717,23 @@ public class AtlasClient { return resource; } }); - return extractResults(jsonResponse, AtlasClient.RESULTS); + return extractResults(jsonResponse, AtlasClient.RESULTS, new ExtractOperation<String, String>()); + } + + private class ExtractOperation<T, U> { + T extractElement(U element) throws JSONException { + return (T) element; + } } - private List<String> extractResults(JSONObject jsonResponse, String key) throws AtlasServiceException { + private <T, U> List<T> extractResults(JSONObject jsonResponse, String key, ExtractOperation<T, U> extractInterafce) + throws AtlasServiceException { try { JSONArray results = jsonResponse.getJSONArray(key); - ArrayList<String> resultsList = new ArrayList<>(); + ArrayList<T> resultsList = new ArrayList<>(); for (int index = 0; index < results.length(); index++) { Object element = results.get(index); - if (element instanceof String) { - resultsList.add((String) element); - } else if (element instanceof JSONObject) { - resultsList.add(((JSONObject) element).getString(AtlasClient.NAME)); - } + resultsList.add(extractInterafce.extractElement((U) element)); } return resultsList; } catch (JSONException e) { @@ -720,6 +742,44 @@ public class AtlasClient { } /** + * Get the latest numResults entity audit events in decreasing order of timestamp for the given entity id + * @param entityId entity id + * @param numResults number of results to be returned + * @return list of audit events for the entity id + * @throws AtlasServiceException + */ + public List<EntityAuditEvent> getEntityAuditEvents(String entityId, short numResults) + throws AtlasServiceException { + return getEntityAuditEvents(entityId, null, numResults); + } + + /** + * Get the entity audit events in decreasing order of timestamp for the given entity id + * @param entityId entity id + * @param startKey key for the first event to be returned, used for pagination + * @param numResults number of results to be returned + * @return list of audit events for the entity id + * @throws AtlasServiceException + */ + public List<EntityAuditEvent> getEntityAuditEvents(String entityId, String startKey, short numResults) + throws AtlasServiceException { + WebResource resource = getResource(API.LIST_ENTITY_AUDIT, entityId, URI_ENTITY_AUDIT); + if (StringUtils.isNotEmpty(startKey)) { + resource = resource.queryParam(START_KEY, startKey); + } + resource = resource.queryParam(NUM_RESULTS, String.valueOf(numResults)); + + JSONObject jsonResponse = callAPIWithResource(API.LIST_ENTITY_AUDIT, resource, null); + return extractResults(jsonResponse, AtlasClient.EVENTS, new ExtractOperation<EntityAuditEvent, JSONObject>() { + @Override + EntityAuditEvent extractElement(JSONObject element) throws JSONException { + return EntityAuditEvent.GSON.fromJson(element.toString(), EntityAuditEvent.class); + } + }); + + } + + /** * Search using gremlin/dsl/full text * @param searchQuery * @return http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/client/src/main/java/org/apache/atlas/EntityAuditEvent.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/EntityAuditEvent.java b/client/src/main/java/org/apache/atlas/EntityAuditEvent.java new file mode 100644 index 0000000..460f708 --- /dev/null +++ b/client/src/main/java/org/apache/atlas/EntityAuditEvent.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.commons.lang.StringUtils; + +/** + * Structure of entity audit event + */ +public class EntityAuditEvent { + public static final Gson GSON = new GsonBuilder().create(); + + public enum EntityAuditAction { + ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE + } + + private String entityId; + private long timestamp; + private String user; + private EntityAuditAction action; + private String details; + private String eventKey; + + public EntityAuditEvent() { + } + + public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) { + this.entityId = entityId; + this.timestamp = ts; + this.user = user; + this.action = action; + this.details = details; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (!(other instanceof EntityAuditEvent)) { + return false; + } + + EntityAuditEvent otherEvent = (EntityAuditEvent) other; + return StringUtils.equals(entityId, otherEvent.entityId) && + (timestamp == otherEvent.timestamp) && + StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) && + StringUtils.equals(details, otherEvent.details) && + StringUtils.equals(eventKey, otherEvent.eventKey); + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public String toString() { + return GSON.toJson(this); + } + + public static EntityAuditEvent fromString(String eventString) { + return GSON.fromJson(eventString, EntityAuditEvent.class); + } + + public String getEntityId() { + return entityId; + } + + public long getTimestamp() { + return timestamp; + } + + public String getUser() { + return user; + } + + public EntityAuditAction getAction() { + return action; + } + + public String getDetails() { + return details; + } + + public void setEntityId(String entityId) { + this.entityId = entityId; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public void setUser(String user) { + this.user = user; + } + + public void setAction(EntityAuditAction action) { + this.action = action; + } + + public void setDetails(String details) { + this.details = details; + } + + public String getEventKey() { + return eventKey; + } + + public void setEventKey(String eventKey) { + this.eventKey = eventKey; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/common/src/main/java/org/apache/atlas/utils/ParamChecker.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/utils/ParamChecker.java b/common/src/main/java/org/apache/atlas/utils/ParamChecker.java index 91adfaf..edfe355 100644 --- a/common/src/main/java/org/apache/atlas/utils/ParamChecker.java +++ b/common/src/main/java/org/apache/atlas/utils/ParamChecker.java @@ -151,4 +151,19 @@ public final class ParamChecker { } return list; } + + /** + * Checks that the given value is <= max value. + * @param value + * @param maxValue + * @param name + */ + public static void lessThan(short value, short maxValue, String name) { + if (value <= 0) { + throw new IllegalArgumentException(name + " should be > 0, current value " + value); + } + if (value > maxValue) { + throw new IllegalArgumentException(name + " should be <= " + maxValue + ", current value " + value); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/distro/pom.xml ---------------------------------------------------------------------- diff --git a/distro/pom.xml b/distro/pom.xml index 0de8ccf..f623a74 100644 --- a/distro/pom.xml +++ b/distro/pom.xml @@ -88,6 +88,7 @@ src="${hbase.tar}" dest="${project.build.directory}/hbase.tar.gz" usetimestamp="true" + verbose="true" skipexisting="true" /> <untar src="${project.build.directory}/hbase.tar.gz" @@ -118,6 +119,7 @@ <descriptor>src/main/assemblies/src-package.xml</descriptor> </descriptors> <finalName>apache-atlas-${project.version}</finalName> + <tarLongFileMode>gnu</tarLongFileMode> </configuration> </execution> </executions> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index c80d2f0..7d32717 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -97,6 +97,11 @@ atlas.http.authentication.type=simple ######### Server Properties ######### atlas.rest.address=http://localhost:21000 +######### Entity Audit Configs ######### +atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS +atlas.audit.zookeeper.session.timeout.ms=1000 +atlas.audit.hbase.zookeeper.quorum=localhost:2181 + ######### High Availability Configuration ######## atlas.server.ha.enabled=false #### Enabled the configs below as per need if HA is enabled ##### http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 7e09a19..68a868f 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -46,9 +46,6 @@ public abstract class AtlasHook { private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class); - /** - * Hadoop Cluster name for this instance, typically used for namespace. - */ protected static Configuration atlasProperties; protected static NotificationInterface notifInterface; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java index e8ae177..44d08d3 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java @@ -19,15 +19,13 @@ package org.apache.atlas.notification; import com.google.inject.AbstractModule; import com.google.inject.Singleton; -import com.google.inject.multibindings.Multibinder; import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.kafka.KafkaNotificationProvider; -import org.apache.atlas.listener.EntityChangeListener; -import org.apache.atlas.notification.entity.NotificationEntityChangeListener; -import org.apache.atlas.service.Service; /** * Notification module for Guice. + * + * NOTE: This module is loaded by hook clients like hive hook etc. Don't add any server specific bindings here. */ public class NotificationModule extends AbstractModule { @@ -35,14 +33,5 @@ public class NotificationModule extends AbstractModule { protected void configure() { bind(NotificationInterface.class).to(KafkaNotification.class).in(Singleton.class); bind(KafkaNotification.class).toProvider(KafkaNotificationProvider.class).in(Singleton.class); - - Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class); - serviceBinder.addBinding().to(KafkaNotification.class); - serviceBinder.addBinding().to(NotificationHookConsumer.class); - - //Add NotificationEntityChangeListener as EntityChangeListener - Multibinder<EntityChangeListener> entityChangeListenerBinder = - Multibinder.newSetBinder(binder(), EntityChangeListener.class); - entityChangeListenerBinder.addBinding().to(NotificationEntityChangeListener.class); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 226529d..03828e9 100755 --- a/pom.xml +++ b/pom.xml @@ -894,6 +894,7 @@ <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> + <scope>test</scope> <exclusions> <exclusion> <groupId>org.mortbay.jetty</groupId> @@ -1481,8 +1482,6 @@ <user.dir>${project.basedir}</user.dir> <atlas.data>${project.build.directory}/data</atlas.data> <log4j.configuration>atlas-log4j.xml</log4j.configuration> - <zookeeper.client.secure>false</zookeeper.client.secure> - <zookeeper.sasl.client>false</zookeeper.sasl.client> </systemProperties> <skipTests>${skipTests}</skipTests> <forkMode>always</forkMode> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index cdaca63..caa670e 100644 --- a/release-log.txt +++ b/release-log.txt @@ -14,6 +14,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-540 API to retrieve entity version events (shwethags) ATLAS-529 support drop database (sumasai) ATLAS-528 Support drop table,view (sumasai) ATLAS-603 Document High Availability of Atlas (yhemanth via sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/repository/pom.xml ---------------------------------------------------------------------- diff --git a/repository/pom.xml b/repository/pom.xml index eca087a..533e48b 100755 --- a/repository/pom.xml +++ b/repository/pom.xml @@ -141,6 +141,11 @@ <dependency> <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <classifier>tests</classifier> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java index ce1bdfb..75d14f0 100755 --- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java +++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java @@ -34,13 +34,14 @@ import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.audit.EntityAuditListener; import org.apache.atlas.repository.audit.EntityAuditRepository; -import org.apache.atlas.repository.audit.InMemoryEntityAuditRepository; +import org.apache.atlas.repository.audit.HBaseBasedAuditRepository; import org.apache.atlas.repository.graph.GraphBackedMetadataRepository; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphProvider; import org.apache.atlas.repository.graph.TitanGraphProvider; import org.apache.atlas.repository.typestore.GraphBackedTypeStore; import org.apache.atlas.repository.typestore.ITypeStore; +import org.apache.atlas.service.Service; import org.apache.atlas.services.DefaultMetadataService; import org.apache.atlas.services.IBootstrapTypesRegistrar; import org.apache.atlas.services.MetadataService; @@ -95,15 +96,11 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { } protected void bindAuditRepository(Binder binder) { - /** Enable this after ATLAS-498 is committed //Map EntityAuditRepository interface to hbase based implementation binder.bind(EntityAuditRepository.class).to(HBaseBasedAuditRepository.class).asEagerSingleton(); //Add HBaseBasedAuditRepository to service so that connection is closed at shutdown - Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class); + Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder, Service.class); serviceBinder.addBinding().to(HBaseBasedAuditRepository.class); - **/ - //Map EntityAuditRepository interface to hbase based implementation - binder.bind(EntityAuditRepository.class).to(InMemoryEntityAuditRepository.class).asEagerSingleton(); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 0c5c551..7f77feb 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -18,14 +18,15 @@ package org.apache.atlas.repository.audit; +import com.google.inject.Inject; import org.apache.atlas.AtlasException; +import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.RequestContext; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.json.InstanceSerialization; -import javax.inject.Inject; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -43,51 +44,55 @@ public class EntityAuditListener implements EntityChangeListener { @Override public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException { - List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>(); + List<EntityAuditEvent> events = new ArrayList<>(); long currentTime = System.currentTimeMillis(); for (ITypedReferenceableInstance entity : entities) { - EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime, - EntityAuditRepository.EntityAuditAction.ENTITY_CREATE, + EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "Created: " + InstanceSerialization.toJson(entity, true)); events.add(event); } auditRepository.putEvents(events); } - private EntityAuditRepository.EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, - EntityAuditRepository.EntityAuditAction action, - String details) { - return new EntityAuditRepository.EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), - action, details); + private EntityAuditEvent createEvent(ITypedReferenceableInstance entity, long ts, + EntityAuditEvent.EntityAuditAction action, String details) { + return new EntityAuditEvent(entity.getId()._getId(), ts, RequestContext.get().getUser(), action, details); } @Override public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException { - + List<EntityAuditEvent> events = new ArrayList<>(); + long currentTime = System.currentTimeMillis(); + for (ITypedReferenceableInstance entity : entities) { + EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, + "Updated: " + InstanceSerialization.toJson(entity, true)); + events.add(event); + } + auditRepository.putEvents(events); } @Override public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { - EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(), - EntityAuditRepository.EntityAuditAction.TAG_ADD, + EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(), + EntityAuditEvent.EntityAuditAction.TAG_ADD, "Added trait: " + InstanceSerialization.toJson(trait, true)); auditRepository.putEvents(event); } @Override public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException { - EntityAuditRepository.EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(), - EntityAuditRepository.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); + EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(), + EntityAuditEvent.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); auditRepository.putEvents(event); } @Override public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException { - List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>(); + List<EntityAuditEvent> events = new ArrayList<>(); long currentTime = System.currentTimeMillis(); for (ITypedReferenceableInstance entity : entities) { - EntityAuditRepository.EntityAuditEvent event = createEvent(entity, currentTime, - EntityAuditRepository.EntityAuditAction.ENTITY_DELETE, "Deleted entity"); + EntityAuditEvent event = createEvent(entity, currentTime, + EntityAuditEvent.EntityAuditAction.ENTITY_DELETE, "Deleted entity"); events.add(event); } auditRepository.putEvents(events); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java index d41c4da..417092a 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java @@ -19,7 +19,7 @@ package org.apache.atlas.repository.audit; import org.apache.atlas.AtlasException; -import org.apache.commons.lang.StringUtils; +import org.apache.atlas.EntityAuditEvent; import java.util.List; @@ -27,82 +27,6 @@ import java.util.List; * Interface for repository for storing entity audit events */ public interface EntityAuditRepository { - enum EntityAuditAction { - ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, TAG_ADD, TAG_DELETE; - } - - /** - * Structure of entity audit event - */ - class EntityAuditEvent { - String entityId; - Long timestamp; - String user; - EntityAuditAction action; - String details; - - public EntityAuditEvent() { - } - - public EntityAuditEvent(String entityId, Long ts, String user, EntityAuditAction action, String details) { - this.entityId = entityId; - this.timestamp = ts; - this.user = user; - this.action = action; - this.details = details; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (!(other instanceof EntityAuditEvent)) { - return false; - } - - EntityAuditEvent otherEvent = (EntityAuditEvent) other; - return StringUtils.equals(entityId, otherEvent.entityId) && - (timestamp.longValue() == otherEvent.timestamp.longValue()) && - StringUtils.equals(user, otherEvent.user) && (action == otherEvent.action) && - StringUtils.equals(details, otherEvent.details); - } - - @Override - public int hashCode() { - return toString().hashCode(); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("EntityId=").append(entityId).append(";Timestamp=").append(timestamp).append(";User=") - .append(user).append(";Action=").append(action).append(";Details=").append(details); - return builder.toString(); - } - - public String getEntityId() { - return entityId; - } - - public Long getTimestamp() { - return timestamp; - } - - public String getUser() { - return user; - } - - public EntityAuditAction getAction() { - return action; - } - - public String getDetails() { - return details; - } - } - /** * Add events to the event repository * @param events events to be added @@ -120,10 +44,10 @@ public interface EntityAuditRepository { /** * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results * @param entityId entity id - * @param ts starting timestamp for events + * @param startKey key for the first event to be returned, used for pagination * @param n number of events to be returned * @return list of events * @throws AtlasException */ - List<EntityAuditRepository.EntityAuditEvent> listEvents(String entityId, Long ts, short n) throws AtlasException; + List<EntityAuditEvent> listEvents(String entityId, String startKey, short n) throws AtlasException; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index c4329a5..8f11322 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.inject.Singleton; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; +import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.service.Service; @@ -59,7 +60,7 @@ import java.util.List; * Columns -> action, user, detail * versions -> 1 * - * Note: The timestamp in the key is assumed to be timestamp in nano seconds. Since the key is entity id + timestamp, + * Note: The timestamp in the key is assumed to be timestamp in milli seconds. Since the key is entity id + timestamp, * and only 1 version is kept, there can be just 1 audit event per entity id + timestamp. This is ok for one atlas server. * But if there are more than one atlas servers, we should use server id in the key */ @@ -87,7 +88,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository * @throws AtlasException */ @Override - public void putEvents(EntityAuditRepository.EntityAuditEvent... events) throws AtlasException { + public void putEvents(EntityAuditEvent... events) throws AtlasException { putEvents(Arrays.asList(events)); } @@ -103,14 +104,12 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository try { table = connection.getTable(tableName); List<Put> puts = new ArrayList<>(events.size()); - for (EntityAuditRepository.EntityAuditEvent event : events) { + for (EntityAuditEvent event : events) { LOG.debug("Adding entity audit event {}", event); - Put put = new Put(getKey(event.entityId, event.timestamp)); - if (event.action != null) { - put.addColumn(COLUMN_FAMILY, COLUMN_ACTION, Bytes.toBytes((short)event.action.ordinal())); - } - addColumn(put, COLUMN_USER, event.user); - addColumn(put, COLUMN_DETAIL, event.details); + Put put = new Put(getKey(event.getEntityId(), event.getTimestamp())); + addColumn(put, COLUMN_ACTION, event.getAction()); + addColumn(put, COLUMN_USER, event.getUser()); + addColumn(put, COLUMN_DETAIL, event.getDetails()); puts.add(put); } table.put(puts); @@ -121,9 +120,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository } } - private void addColumn(Put put, byte[] columnName, String columnValue) { - if (StringUtils.isNotEmpty(columnValue)) { - put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue)); + private <T> void addColumn(Put put, byte[] columnName, T columnValue) { + if (columnValue != null && !columnValue.toString().isEmpty()) { + put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue.toString())); } } @@ -135,41 +134,58 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository } /** - * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results + * List events for the given entity id in decreasing order of timestamp, from the given startKey. Returns n results * @param entityId entity id - * @param ts starting timestamp for events + * @param startKey key for the first event to be returned, used for pagination * @param n number of events to be returned * @return list of events * @throws AtlasException */ - public List<EntityAuditRepository.EntityAuditEvent> listEvents(String entityId, Long ts, short n) + public List<EntityAuditEvent> listEvents(String entityId, String startKey, short n) throws AtlasException { - LOG.info("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, ts, n); + LOG.info("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n); Table table = null; ResultScanner scanner = null; try { table = connection.getTable(tableName); + + /** + * Scan Details: + * In hbase, the events are stored in increasing order of timestamp. So, doing reverse scan to get the latest event first + * Page filter is set to limit the number of results returned. + * Stop row is set to the entity id to avoid going past the current entity while scanning + * small is set to true to optimise RPC calls as the scanner is created per request + */ Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n)) - .setStartRow(getKey(entityId, ts)) .setStopRow(Bytes.toBytes(entityId)) .setCaching(n) .setSmall(true); + if (StringUtils.isEmpty(startKey)) { + //Set start row to entity id + max long value + byte[] entityBytes = getKey(entityId, Long.MAX_VALUE); + scan = scan.setStartRow(entityBytes); + } else { + scan = scan.setStartRow(Bytes.toBytes(startKey)); + } scanner = table.getScanner(scan); Result result; - List<EntityAuditRepository.EntityAuditEvent> events = new ArrayList<>(); + List<EntityAuditEvent> events = new ArrayList<>(); //PageFilter doesn't ensure n results are returned. The filter is per region server. //So, adding extra check on n here while ((result = scanner.next()) != null && events.size() < n) { - String key = Bytes.toString(result.getRow()); - EntityAuditRepository.EntityAuditEvent event = fromKey(key); - event.user = getResultString(result, COLUMN_USER); - event.action = - EntityAuditAction.values()[(Bytes.toShort(result.getValue(COLUMN_FAMILY, COLUMN_ACTION)))]; - event.details = getResultString(result, COLUMN_DETAIL); + EntityAuditEvent event = fromKey(result.getRow()); + + //In case the user sets random start key, guarding against random events + if (!event.getEntityId().equals(entityId)) { + continue; + } + event.setUser(getResultString(result, COLUMN_USER)); + event.setAction(EntityAuditEvent.EntityAuditAction.valueOf(getResultString(result, COLUMN_ACTION))); + event.setDetails(getResultString(result, COLUMN_DETAIL)); events.add(event); } - LOG.info("Got events for entity id {}, starting timestamp {}, #records {}", entityId, ts, events.size()); + LOG.info("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size()); return events; } catch (IOException e) { throw new AtlasException(e); @@ -183,12 +199,14 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository return Bytes.toString(result.getValue(COLUMN_FAMILY, columnName)); } - private EntityAuditEvent fromKey(String key) { + private EntityAuditEvent fromKey(byte[] keyBytes) { + String key = Bytes.toString(keyBytes); EntityAuditEvent event = new EntityAuditEvent(); if (StringUtils.isNotEmpty(key)) { String[] parts = key.split(FIELD_SEPARATOR); - event.entityId = parts[0]; - event.timestamp = Long.valueOf(parts[1]); + event.setEntityId(parts[0]); + event.setTimestamp(Long.valueOf(parts[1])); + event.setEventKey(key); } return event; } @@ -222,8 +240,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository } private void createTableIfNotExists() throws AtlasException { + Admin admin = null; try { - Admin admin = connection.getAdmin(); + admin = connection.getAdmin(); LOG.info("Checking if table {} exists", tableName.getNameAsString()); if (!admin.tableExists(tableName)) { LOG.info("Creating table {}", tableName.getNameAsString()); @@ -237,6 +256,8 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository } } catch (IOException e) { throw new AtlasException(e); + } finally { + close(admin); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java index df75290..93f224f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java @@ -19,6 +19,7 @@ package org.apache.atlas.repository.audit; import org.apache.atlas.AtlasException; +import org.apache.atlas.EntityAuditEvent; import java.util.ArrayList; import java.util.Arrays; @@ -40,17 +41,23 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository { @Override public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException { for (EntityAuditEvent event : events) { - auditEvents.put(event.entityId + (Long.MAX_VALUE - event.timestamp), event); + String rowKey = event.getEntityId() + (Long.MAX_VALUE - event.getTimestamp()); + event.setEventKey(rowKey); + auditEvents.put(rowKey, event); } } @Override - public List<EntityAuditEvent> listEvents(String entityId, Long ts, short maxResults) + public List<EntityAuditEvent> listEvents(String entityId, String startKey, short maxResults) throws AtlasException { List<EntityAuditEvent> events = new ArrayList<>(); - SortedMap<String, EntityAuditEvent> subMap = auditEvents.tailMap(entityId + (Long.MAX_VALUE - ts)); + String myStartKey = startKey; + if (myStartKey == null) { + myStartKey = entityId; + } + SortedMap<String, EntityAuditEvent> subMap = auditEvents.tailMap(myStartKey); for (EntityAuditEvent event : subMap.values()) { - if (events.size() < maxResults && event.entityId.equals(entityId)) { + if (events.size() < maxResults && event.getEntityId().equals(entityId)) { events.add(event); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index 0a04c5f..9f69940 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -22,10 +22,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Provider; - import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; +import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.classification.InterfaceAudience; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; @@ -33,6 +33,7 @@ import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; @@ -85,6 +86,9 @@ import java.util.Map; public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class); + private final short maxAuditResults; + private static final String CONFIG_MAX_AUDIT_RESULTS = "atlas.audit.maxResults"; + private static final short DEFAULT_MAX_AUDIT_RESULTS = 1000; private final TypeSystem typeSystem; private final MetadataRepository repository; @@ -97,6 +101,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang private boolean wasInitialized = false; @Inject + private EntityAuditRepository auditRepository; + + @Inject DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, final IBootstrapTypesRegistrar typesRegistrar, final Collection<Provider<TypesChangeListener>> typeListenerProviders, @@ -128,6 +135,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang if (!HAConfiguration.isHAEnabled(configuration)) { restoreTypeSystem(); } + + maxAuditResults = configuration.getShort(CONFIG_MAX_AUDIT_RESULTS, DEFAULT_MAX_AUDIT_RESULTS); } private void restoreTypeSystem() throws AtlasException { @@ -211,7 +220,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang } private JSONObject createOrUpdateTypes(String typeDefinition, boolean isUpdate) throws AtlasException { - ParamChecker.notEmpty(typeDefinition, "type definition cannot be empty"); + ParamChecker.notEmpty(typeDefinition, "type definition"); TypesDef typesDef = validateTypeDefinition(typeDefinition); try { @@ -299,7 +308,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang */ @Override public String createEntities(String entityInstanceDefinition) throws AtlasException { - ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition cannot be empty"); + ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition"); ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition); @@ -348,7 +357,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang */ @Override public String getEntityDefinition(String guid) throws AtlasException { - ParamChecker.notEmpty(guid, "guid cannot be null"); + ParamChecker.notEmpty(guid, "entity id"); final ITypedReferenceableInstance instance = repository.getEntityDefinition(guid); return InstanceSerialization.toJson(instance, true); @@ -404,7 +413,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang @Override public String updateEntities(String entityInstanceDefinition) throws AtlasException { - ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition cannot be empty"); + ParamChecker.notEmpty(entityInstanceDefinition, "Entity instance definition"); ITypedReferenceableInstance[] typedInstances = deserializeClassInstances(entityInstanceDefinition); TypeUtils.Pair<List<String>, List<String>> guids = repository.updateEntities(typedInstances); @@ -421,9 +430,9 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang @Override public String updateEntityAttributeByGuid(final String guid, String attributeName, String value) throws AtlasException { - ParamChecker.notEmpty(guid, "guid cannot be null"); - ParamChecker.notEmpty(attributeName, "property cannot be null"); - ParamChecker.notEmpty(value, "property value cannot be null"); + ParamChecker.notEmpty(guid, "entity id"); + ParamChecker.notEmpty(attributeName, "attribute name"); + ParamChecker.notEmpty(value, "attribute value"); ITypedReferenceableInstance existInstance = validateEntityExists(guid); ClassType type = typeSystem.getDataType(ClassType.class, existInstance.getTypeName()); @@ -520,10 +529,10 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang @Override public String updateEntityByUniqueAttribute(String typeName, String uniqueAttributeName, String attrValue, Referenceable updatedEntity) throws AtlasException { - ParamChecker.notEmpty(typeName, "typeName cannot be null"); - ParamChecker.notEmpty(uniqueAttributeName, "uniqueAttributeName cannot be null"); - ParamChecker.notNull(attrValue, "value cannot be null"); - ParamChecker.notNull(updatedEntity, "updatedEntity cannot be null"); + ParamChecker.notEmpty(typeName, "typeName"); + ParamChecker.notEmpty(uniqueAttributeName, "uniqueAttributeName"); + ParamChecker.notNull(attrValue, "unique attribute value"); + ParamChecker.notNull(updatedEntity, "updatedEntity"); ITypedReferenceableInstance oldInstance = getEntityDefinitionReference(typeName, uniqueAttributeName, attrValue); @@ -535,7 +544,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang } private void validateTypeExists(String entityType) throws AtlasException { - ParamChecker.notEmpty(entityType, "entity type cannot be null"); + ParamChecker.notEmpty(entityType, "entity type"); IDataType type = typeSystem.getDataType(IDataType.class, entityType); if (type.getTypeCategory() != DataTypes.TypeCategory.CLASS) { @@ -552,7 +561,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang */ @Override public List<String> getTraitNames(String guid) throws AtlasException { - ParamChecker.notEmpty(guid, "entity GUID cannot be null"); + ParamChecker.notEmpty(guid, "entity id"); return repository.getTraitNames(guid); } @@ -565,8 +574,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang */ @Override public void addTrait(String guid, String traitInstanceDefinition) throws AtlasException { - ParamChecker.notEmpty(guid, "entity GUID cannot be null"); - ParamChecker.notEmpty(traitInstanceDefinition, "Trait instance cannot be null"); + ParamChecker.notEmpty(guid, "entity id"); + ParamChecker.notEmpty(traitInstanceDefinition, "trait instance definition"); ITypedStruct traitInstance = deserializeTraitInstance(traitInstanceDefinition); final String traitName = traitInstance.getTypeName(); @@ -594,7 +603,7 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang try { Struct traitInstance = InstanceSerialization.fromJsonStruct(traitInstanceDefinition, true); final String entityTypeName = traitInstance.getTypeName(); - ParamChecker.notEmpty(entityTypeName, "entity type cannot be null"); + ParamChecker.notEmpty(entityTypeName, "entity type"); TraitType traitType = typeSystem.getDataType(TraitType.class, entityTypeName); return traitType.convert(traitInstance, Multiplicity.REQUIRED); @@ -614,8 +623,8 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang */ @Override public void deleteTrait(String guid, String traitNameToBeDeleted) throws AtlasException { - ParamChecker.notEmpty(guid, "entity GUID cannot be null"); - ParamChecker.notEmpty(traitNameToBeDeleted, "Trait name cannot be null"); + ParamChecker.notEmpty(guid, "entity id"); + ParamChecker.notEmpty(traitNameToBeDeleted, "trait name"); // ensure trait type is already registered with the TS if (!typeSystem.isRegistered(traitNameToBeDeleted)) { @@ -685,6 +694,15 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang entityChangeListeners.remove(listener); } + @Override + public List<EntityAuditEvent> getAuditEvents(String guid, String startKey, short count) throws AtlasException { + ParamChecker.notEmpty(guid, "entity id"); + ParamChecker.notEmptyIfNotNull(startKey, "start key"); + ParamChecker.lessThan(count, maxAuditResults, "count"); + + return auditRepository.listEvents(guid, startKey, count); + } + /* (non-Javadoc) * @see org.apache.atlas.services.MetadataService#deleteEntities(java.lang.String) */ http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java index 9c193f7..be407a5 100644 --- a/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java +++ b/repository/src/test/java/org/apache/atlas/repository/audit/AuditRepositoryTestBase.java @@ -18,6 +18,7 @@ package org.apache.atlas.repository.audit; +import org.apache.atlas.EntityAuditEvent; import org.apache.commons.lang.RandomStringUtils; import org.testng.annotations.Test; @@ -25,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; public class AuditRepositoryTestBase { protected EntityAuditRepository eventRepository; @@ -35,16 +37,15 @@ public class AuditRepositoryTestBase { @Test public void testAddEvents() throws Exception { - EntityAuditRepository.EntityAuditEvent event = - new EntityAuditRepository.EntityAuditEvent(rand(), System.currentTimeMillis(), "u1", - EntityAuditRepository.EntityAuditAction.ENTITY_CREATE, "d1"); + EntityAuditEvent event = new EntityAuditEvent(rand(), System.currentTimeMillis(), "u1", + EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "d1"); eventRepository.putEvents(event); - List<EntityAuditRepository.EntityAuditEvent> events = - eventRepository.listEvents(event.entityId, System.currentTimeMillis(), (short) 10); + List<EntityAuditEvent> events = + eventRepository.listEvents(event.getEntityId(), null, (short) 10); assertEquals(events.size(), 1); - assertEquals(events.get(0), event); + assertEventEquals(events.get(0), event); } @Test @@ -53,29 +54,46 @@ public class AuditRepositoryTestBase { String id2 = "id2" + rand(); String id3 = "id3" + rand(); long ts = System.currentTimeMillis(); - List<EntityAuditRepository.EntityAuditEvent> expectedEvents = new ArrayList<>(3); + List<EntityAuditEvent> expectedEvents = new ArrayList<>(3); for (int i = 0; i < 3; i++) { //Add events for both ids - EntityAuditRepository.EntityAuditEvent event = - new EntityAuditRepository.EntityAuditEvent(id2, ts - i, "user" + i, - EntityAuditRepository.EntityAuditAction.ENTITY_UPDATE, "details" + i); + EntityAuditEvent event = new EntityAuditEvent(id2, ts - i, "user" + i, + EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, "details" + i); eventRepository.putEvents(event); expectedEvents.add(event); - eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id1, ts - i, "user" + i, - EntityAuditRepository.EntityAuditAction.TAG_DELETE, "details" + i)); - eventRepository.putEvents(new EntityAuditRepository.EntityAuditEvent(id3, ts - i, "user" + i, - EntityAuditRepository.EntityAuditAction.TAG_ADD, "details" + i)); + eventRepository.putEvents(new EntityAuditEvent(id1, ts - i, "user" + i, + EntityAuditEvent.EntityAuditAction.TAG_DELETE, "details" + i)); + eventRepository.putEvents(new EntityAuditEvent(id3, ts - i, "user" + i, + EntityAuditEvent.EntityAuditAction.TAG_ADD, "details" + i)); } //Use ts for which there is no event - ts + 2 - List<EntityAuditRepository.EntityAuditEvent> events = eventRepository.listEvents(id2, ts + 2, (short) 2); - assertEquals(events.size(), 2); - assertEquals(events.get(0), expectedEvents.get(0)); - assertEquals(events.get(1), expectedEvents.get(1)); + List<EntityAuditEvent> events = eventRepository.listEvents(id2, null, (short) 3); + assertEquals(events.size(), 3); + assertEventEquals(events.get(0), expectedEvents.get(0)); + assertEventEquals(events.get(1), expectedEvents.get(1)); + assertEventEquals(events.get(2), expectedEvents.get(2)); //Use last event's timestamp for next list(). Should give only 1 event and shouldn't include events from other id - events = eventRepository.listEvents(id2, events.get(1).timestamp - 1, (short) 3); + events = eventRepository.listEvents(id2, events.get(2).getEventKey(), (short) 3); assertEquals(events.size(), 1); - assertEquals(events.get(0), expectedEvents.get(2)); + assertEventEquals(events.get(0), expectedEvents.get(2)); + } + + @Test + public void testInvalidEntityId() throws Exception { + List<EntityAuditEvent> events = eventRepository.listEvents(rand(), null, (short) 3); + assertEquals(events.size(), 0); + } + + private void assertEventEquals(EntityAuditEvent actual, EntityAuditEvent expected) { + if (expected != null) { + assertNotNull(actual); + } + + assertEquals(actual.getEntityId(), expected.getEntityId()); + assertEquals(actual.getAction(), expected.getAction()); + assertEquals(actual.getTimestamp(), expected.getTimestamp()); + assertEquals(actual.getDetails(), expected.getDetails()); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java index 156eb3d..1156b67 100644 --- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java @@ -35,6 +35,7 @@ import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.AtlasException; +import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.RequestContext; import org.apache.atlas.TestUtils; @@ -72,6 +73,7 @@ import java.util.List; import java.util.Map; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -211,7 +213,7 @@ public class DefaultMetadataServiceTest { //name is the unique attribute Referenceable entity = createDBEntity(); String id = createInstance(entity); - assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE); + assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE); //using the same name should succeed, but not create another entity String newId = createInstance(entity); @@ -228,28 +230,36 @@ public class DefaultMetadataServiceTest { //create entity Referenceable entity = createDBEntity(); String id = createInstance(entity); - assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_CREATE); + assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE); Struct tag = new Struct(TestUtils.PII); metadataService.addTrait(id, InstanceSerialization.toJson(tag, true)); - assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_ADD); + assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.TAG_ADD); metadataService.deleteTrait(id, TestUtils.PII); - assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.TAG_DELETE); + assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.TAG_DELETE); + + metadataService.updateEntityAttributeByGuid(id, "description", "new description"); + assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE); metadataService.deleteEntities(Arrays.asList(id)); - assertAuditEvents(id, EntityAuditRepository.EntityAuditAction.ENTITY_DELETE); + assertAuditEvents(id, EntityAuditEvent.EntityAuditAction.ENTITY_DELETE); } - private void assertAuditEvents(String id, EntityAuditRepository.EntityAuditAction action) throws Exception { - List<EntityAuditRepository.EntityAuditEvent> events = - repository.listEvents(id, System.currentTimeMillis(), (short) 10); - for (EntityAuditRepository.EntityAuditEvent event : events) { - if (event.getAction() == action) { + private void assertAuditEvents(String id, EntityAuditEvent.EntityAuditAction expectedAction) throws Exception { + List<EntityAuditEvent> events = repository.listEvents(id, null, (short) 10); + for (EntityAuditEvent event : events) { + if (event.getAction() == expectedAction) { return; } } - fail("Didn't find " + action + " in audit events"); + fail("Expected audit action " + expectedAction); + } + + private void assertAuditEvents(String entityId, int numEvents) throws Exception { + List<EntityAuditEvent> events = repository.listEvents(entityId, null, (short)numEvents); + assertNotNull(events); + assertEquals(events.size(), numEvents); } @Test @@ -257,6 +267,10 @@ public class DefaultMetadataServiceTest { Referenceable db = createDBEntity(); String dbId = createInstance(db); + //Assert that there is just 1 audit events and thats for entity create + assertAuditEvents(dbId, 1); + assertAuditEvents(dbId, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE); + Referenceable table = new Referenceable(TestUtils.TABLE_TYPE); table.set(NAME, TestUtils.randomString()); table.set("description", "random table"); @@ -272,6 +286,9 @@ public class DefaultMetadataServiceTest { Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); Referenceable actualDb = (Referenceable) tableDefinition.get("databaseComposite"); Assert.assertEquals(actualDb.getId().id, dbId); + + //Assert that as part table create, db is not created and audit event is not added to db + assertAuditEvents(dbId, 1); } @Test @@ -280,7 +297,8 @@ public class DefaultMetadataServiceTest { Referenceable tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{ put("columnNames", colNameList); }}); - metadataService.updateEntityByUniqueAttribute(table.getTypeName(), NAME, (String) table.get(NAME), tableUpdated); + metadataService.updateEntityByUniqueAttribute(table.getTypeName(), NAME, (String) table.get(NAME), + tableUpdated); String tableDefinitionJson = metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, NAME, (String) table.get(NAME)); @@ -291,7 +309,6 @@ public class DefaultMetadataServiceTest { @Test public void testUpdateEntityWithMap() throws Exception { - final Map<String, Struct> partsMap = new HashMap<>(); partsMap.put("part0", new Struct(TestUtils.PARTITION_STRUCT_TYPE, new HashMap<String, Object>() {{ @@ -599,8 +616,8 @@ public class DefaultMetadataServiceTest { //ATLAS-383 Test Referenceable sdReferenceable = new Referenceable(TestUtils.STORAGE_DESC_TYPE); sdReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, TestUtils.randomString()); - sdReferenceable.set("compressed", "false"); - sdReferenceable.set("location", "hdfs://tmp/hive-user"); + sdReferenceable.set("compressed", "false"); + sdReferenceable.set("location", "hdfs://tmp/hive-user"); String sdGuid = createInstance(sdReferenceable); Referenceable sdRef2 = new Referenceable(sdGuid, TestUtils.STORAGE_DESC_TYPE, null); @@ -631,7 +648,7 @@ public class DefaultMetadataServiceTest { metadataService.getEntityDefinition(tableId._getId()); Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); - Assert.assertEquals(dbId, (((Id)tableDefinition.get("database"))._getId())); + Assert.assertEquals(dbId, (((Id) tableDefinition.get("database"))._getId())); /* Update with referenceable - TODO - Fails . Need to fix this */ /*final String dbName = TestUtils.randomString(); @@ -786,7 +803,7 @@ public class DefaultMetadataServiceTest { //Update optional Attribute Assert.assertNotNull(tableDefinition.get("created")); //Update optional attribute - table.setNull("created"); + table.setNull("created"); String newtableId = updateInstance(table); Assert.assertEquals(newtableId, tableId._getId()); @@ -798,7 +815,7 @@ public class DefaultMetadataServiceTest { } @Test - public void testCreateEntityWithEnum() throws Exception { + public void testCreateEntityWithEnum ()throws Exception { String tableDefinitionJson = metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, NAME, (String) table.get(NAME)); Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); @@ -1006,6 +1023,57 @@ public class DefaultMetadataServiceTest { Assert.assertNotNull(typeDefinition); } + @Test + public void testAuditEventsInvalidParams() throws Exception { + //entity id can't be null + try { + metadataService.getAuditEvents(null, "key", (short) 10); + fail("expected IllegalArgumentException"); + } catch(IllegalArgumentException e) { + //expected IllegalArgumentException + assertEquals(e.getMessage(), "entity id cannot be null"); + } + + //entity id can't be empty + try { + metadataService.getAuditEvents("", "key", (short) 10); + fail("expected IllegalArgumentException"); + } catch(IllegalArgumentException e) { + //expected IllegalArgumentException + assertEquals(e.getMessage(), "entity id cannot be empty"); + } + + //start key can be null + metadataService.getAuditEvents("id", null, (short) 10); + + //start key can't be emoty + try { + metadataService.getAuditEvents("id", "", (short) 10); + fail("expected IllegalArgumentException"); + } catch(IllegalArgumentException e) { + //expected IllegalArgumentException + assertEquals(e.getMessage(), "start key cannot be empty"); + } + + //number of results can't be > max value + try { + metadataService.getAuditEvents("id", "key", (short) 10000); + fail("expected IllegalArgumentException"); + } catch(IllegalArgumentException e) { + //expected IllegalArgumentException + assertEquals(e.getMessage(), "count should be <= 1000, current value 10000"); + } + + //number of results can't be <= 0 + try { + metadataService.getAuditEvents("id", "key", (short) -1); + fail("expected IllegalArgumentException"); + } catch(IllegalArgumentException e) { + //expected IllegalArgumentException + assertEquals(e.getMessage(), "count should be > 0, current value -1"); + } + } + private static class DeleteEntitiesChangeListener implements EntityChangeListener { private Collection<ITypedReferenceableInstance> deletedEntities_; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/c2356f8e/server-api/pom.xml ---------------------------------------------------------------------- diff --git a/server-api/pom.xml b/server-api/pom.xml index d3e84c4..9da3279 100644 --- a/server-api/pom.xml +++ b/server-api/pom.xml @@ -52,10 +52,12 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> </dependency> + <dependency> <groupId>org.apache.atlas</groupId> <artifactId>atlas-client</artifactId>
