Repository: incubator-atlas Updated Branches: refs/heads/branch-0.6-incubating dc2ca4520 -> dad90970d
ATLAS-386 Handle hive rename Table (shwethags) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/dad90970 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/dad90970 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/dad90970 Branch: refs/heads/branch-0.6-incubating Commit: dad90970da80038757619bd752e72a16e27d36c2 Parents: dc2ca45 Author: Shwetha GS <[email protected]> Authored: Mon Dec 14 22:15:01 2015 +0530 Committer: Shwetha GS <[email protected]> Committed: Mon Dec 14 22:15:01 2015 +0530 ---------------------------------------------------------------------- addons/hive-bridge/pom.xml | 4 + .../atlas/hive/bridge/HiveMetaStoreBridge.java | 4 + .../org/apache/atlas/hive/hook/HiveHook.java | 112 ++++++------ .../org/apache/atlas/hive/hook/HiveHookIT.java | 15 +- .../main/java/org/apache/atlas/AtlasClient.java | 13 +- docs/src/site/twiki/Bridge-Hive.twiki | 2 +- .../apache/atlas/kafka/KafkaNotification.java | 2 +- .../notification/AbstractNotification.java | 19 ++ .../AbstractNotificationConsumer.java | 42 ++++- .../notification/NotificationHookConsumer.java | 49 ++++-- .../notification/NotificationInterface.java | 8 +- .../NotificationEntityChangeListener.java | 32 +--- .../notification/hook/HookNotification.java | 174 +++++++++++++++++++ .../atlas/kafka/KafkaNotificationTest.java | 20 --- .../notification/hook/HookNotificationTest.java | 68 ++++++++ pom.xml | 9 +- release-log.txt | 1 + .../atlas/services/DefaultMetadataService.java | 7 + .../test/java/org/apache/atlas/TestUtils.java | 4 +- .../graph/TitanGraphProviderTest.java | 3 - .../service/DefaultMetadataServiceTest.java | 46 +++-- .../apache/atlas/typesystem/Referenceable.java | 6 +- .../atlas/typesystem/types/ClassType.java | 6 +- .../atlas/typesystem/types/Multiplicity.java | 4 +- .../src/main/resources/application.properties | 6 +- webapp/pom.xml | 4 + .../src/main/java/org/apache/atlas/Atlas.java | 4 + .../NotificationHookConsumerIT.java | 111 ++++++++---- .../web/resources/EntityJerseyResourceIT.java | 31 ++++ 29 files changed, 602 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index db842d7..4b0ac0f 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -290,6 +290,10 @@ <name>atlas.log.dir</name> <value>${project.build.directory}/logs</value> </systemProperty> + <systemProperty> + <name>atlas.data</name> + <value>${project.build.directory}/data</value> + </systemProperty> </systemProperties> <stopKey>atlas-stop</stopKey> <stopPort>31001</stopPort> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index f367317..ee5ae10 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -70,6 +70,10 @@ public class HiveMetaStoreBridge { this(hiveConf, atlasConf, null, null); } + public String getClusterName() { + return clusterName; + } + /** * Construct a HiveMetaStoreBridge. * @param hiveConf hive conf http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 2f88446..37a3169 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -25,11 +25,12 @@ import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; +import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -47,16 +48,12 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.security.UserGroupInformation; -import org.codehaus.jettison.json.JSONArray; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -109,6 +106,8 @@ public class HiveHook implements ExecuteWithHookContext { @Inject private static NotificationInterface notifInterface; + private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>(); + private static final HiveConf hiveConf; static { @@ -233,37 +232,51 @@ public class HiveHook implements ExecuteWithHookContext { default: } + + notifyAtlas(); } - //todo re-write with notification private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { //crappy, no easy of getting new name assert event.inputs != null && event.inputs.size() == 1; assert event.outputs != null && event.outputs.size() > 0; - Table oldTable = event.inputs.iterator().next().getTable(); - Table newTable = null; + //Update entity if not exists + ReadEntity oldEntity = event.inputs.iterator().next(); + Table oldTable = oldEntity.getTable(); + for (WriteEntity writeEntity : event.outputs) { if (writeEntity.getType() == Entity.Type.TABLE) { - Table table = writeEntity.getTable(); - if (table.getDbName().equals(oldTable.getDbName()) && !table.getTableName() + Table newTable = writeEntity.getTable(); + if (newTable.getDbName().equals(oldTable.getDbName()) && !newTable.getTableName() .equals(oldTable.getTableName())) { - newTable = table; - break; + + //Create/update old table entity - create new entity and replace id + Referenceable tableEntity = createEntities(dgiBridge, writeEntity); + String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), + oldTable.getDbName(), oldTable.getTableName()); + tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName); + tableEntity.set(HiveDataModelGenerator.TABLE_NAME, oldTable.getTableName().toLowerCase()); + + + String newQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), + newTable.getDbName(), newTable.getTableName()); + + Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); + newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName); + newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase()); + messages.add(new HookNotification.EntityPartialUpdateRequest(HiveDataTypes.HIVE_TABLE.getName(), + HiveDataModelGenerator.NAME, oldQualifiedName, newEntity)); } } } - if (newTable == null) { - LOG.warn("Failed to deduct new name for " + event.queryStr); - return; - } } - private Map<Type, Referenceable> createEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception { - Map<Type, Referenceable> entities = new LinkedHashMap<>(); + private Referenceable createEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception { Database db = null; Table table = null; Partition partition = null; + List<Referenceable> entities = new ArrayList<>(); switch (entity.getType()) { case DATABASE: @@ -283,64 +296,54 @@ public class HiveHook implements ExecuteWithHookContext { } db = dgiBridge.hiveClient.getDatabase(db.getName()); - Referenceable dbReferenceable = dgiBridge.createDBInstance(db); - entities.put(Type.DATABASE, dbReferenceable); + Referenceable dbEntity = dgiBridge.createDBInstance(db); + entities.add(dbEntity); - Referenceable tableReferenceable = null; + Referenceable tableEntity = null; if (table != null) { table = dgiBridge.hiveClient.getTable(table.getDbName(), table.getTableName()); - tableReferenceable = dgiBridge.createTableInstance(dbReferenceable, table); - entities.put(Type.TABLE, tableReferenceable); + tableEntity = dgiBridge.createTableInstance(dbEntity, table); + entities.add(tableEntity); } if (partition != null) { - Referenceable partitionReferenceable = dgiBridge.createPartitionReferenceable(tableReferenceable, - (Referenceable) tableReferenceable.get("sd"), partition); - entities.put(Type.PARTITION, partitionReferenceable); + Referenceable partitionEntity = dgiBridge.createPartitionReferenceable(tableEntity, + (Referenceable) tableEntity.get("sd"), partition); + entities.add(partitionEntity); } - return entities; + + messages.add(new HookNotification.EntityUpdateRequest(entities)); + return tableEntity; } private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception { - List<Referenceable> entities = new ArrayList<>(); for (WriteEntity entity : event.outputs) { if (entity.getType() == entityType) { - entities.addAll(createEntities(dgiBridge, entity).values()); + createEntities(dgiBridge, entity); } } - notifyEntity(entities); - } - - private void notifyEntity(Collection<Referenceable> entities) { - JSONArray entitiesArray = new JSONArray(); - for (Referenceable entity : entities) { - String entityJson = InstanceSerialization.toJson(entity, true); - entitiesArray.put(entityJson); - } - notifyEntity(entitiesArray); } /** * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities. * De-duping of entities is done on server side depending on the unique attribute on the - * @param entities */ - private void notifyEntity(JSONArray entities) { + private void notifyAtlas() { int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3); - String message = entities.toString(); + LOG.debug("Notifying atlas with messages {}", messages); int numRetries = 0; while (true) { try { - notifInterface.send(NotificationInterface.NotificationType.HOOK, message); - return; + notifInterface.send(NotificationInterface.NotificationType.HOOK, messages); + break; } catch(Exception e) { numRetries++; if(numRetries < maxRetries) { - LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e); + LOG.debug("Failed to notify atlas. Retrying", e); } else { - LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message, - maxRetries, e); + LOG.error("Failed to notify atlas after {} retries. Quitting", maxRetries, e); + break; } } } @@ -369,7 +372,7 @@ public class HiveHook implements ExecuteWithHookContext { String queryStr = normalize(event.queryStr); LOG.debug("Registering query: {}", queryStr); - List<Referenceable> entities = new ArrayList<>(); + Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); processReferenceable.set("name", queryStr); processReferenceable.set("operationType", event.operation.getOperationName()); @@ -379,9 +382,8 @@ public class HiveHook implements ExecuteWithHookContext { List<Referenceable> source = new ArrayList<>(); for (ReadEntity readEntity : inputs) { if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) { - Map<Type, Referenceable> localEntities = createEntities(dgiBridge, readEntity); - source.add(localEntities.get(Type.TABLE)); - entities.addAll(localEntities.values()); + Referenceable inTable = createEntities(dgiBridge, readEntity); + source.add(inTable); } } processReferenceable.set("inputs", source); @@ -389,9 +391,8 @@ public class HiveHook implements ExecuteWithHookContext { List<Referenceable> target = new ArrayList<>(); for (WriteEntity writeEntity : outputs) { if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) { - Map<Type, Referenceable> localEntities = createEntities(dgiBridge, writeEntity); - target.add(localEntities.get(Type.TABLE)); - entities.addAll(localEntities.values()); + Referenceable outTable = createEntities(dgiBridge, writeEntity); + target.add(outTable); } } processReferenceable.set("outputs", target); @@ -402,8 +403,7 @@ public class HiveHook implements ExecuteWithHookContext { //TODO set processReferenceable.set("queryGraph", "queryGraph"); - entities.add(processReferenceable); - notifyEntity(entities); + messages.add(new HookNotification.EntityCreateRequest(processReferenceable)); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index 5447de5..1c3d9a4 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -149,6 +149,17 @@ public class HiveHookIT { assertDatabaseIsRegistered(DEFAULT_DB); } + @Test + public void testRenameTable() throws Exception { + String tableName = createTable(); + String newTableName = tableName(); + runCommand(String.format("alter table %s rename to %s", tableName, newTableName)); + + assertTableIsRegistered(DEFAULT_DB, newTableName); + assertTableIsNotRegistered(DEFAULT_DB, tableName); + } + + private String assertColumnIsRegistered(String colName) throws Exception { LOG.debug("Searching for column {}", colName); String query = @@ -327,8 +338,8 @@ public class HiveHookIT { LOG.debug("Searching for partition of {}.{} with values {}", dbName, tableName, value); String dslQuery = String.format("%s as p where values = ['%s'], table where tableName = '%s', " - + "db where name = '%s' and clusterName = '%s' select p", typeName, value, - tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); + + "db where name = '%s' and clusterName = '%s' select p", typeName, value, + tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); return assertEntityIsRegistered(dslQuery, "p"); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index b108b25..0a730af 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -44,6 +44,7 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED; @@ -291,12 +292,16 @@ public class AtlasClient { } public JSONArray createEntity(Referenceable... entities) throws AtlasServiceException { + return createEntity(Arrays.asList(entities)); + } + + public JSONArray createEntity(Collection<Referenceable> entities) throws AtlasServiceException { JSONArray entityArray = getEntitiesArray(entities); return createEntity(entityArray); } - private JSONArray getEntitiesArray(Referenceable[] entities) { - JSONArray entityArray = new JSONArray(entities.length); + private JSONArray getEntitiesArray(Collection<Referenceable> entities) { + JSONArray entityArray = new JSONArray(entities.size()); for (Referenceable entity : entities) { entityArray.put(InstanceSerialization.toJson(entity, true)); } @@ -311,6 +316,10 @@ public class AtlasClient { * @throws AtlasServiceException */ public JSONArray updateEntities(Referenceable... entities) throws AtlasServiceException { + return updateEntities(Arrays.asList(entities)); + } + + public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException { JSONArray entitiesArray = getEntitiesArray(entities); JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString()); try { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/docs/src/site/twiki/Bridge-Hive.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/Bridge-Hive.twiki b/docs/src/site/twiki/Bridge-Hive.twiki index c99f046..0c7732b 100644 --- a/docs/src/site/twiki/Bridge-Hive.twiki +++ b/docs/src/site/twiki/Bridge-Hive.twiki @@ -29,7 +29,7 @@ hive_process - attribute name - <queryString> - trimmed query string in lower ca ---++ Importing Hive Metadata org.apache.atlas.hive.bridge.HiveMetaStoreBridge imports the hive metadata into Atlas using the model defined in org.apache.atlas.hive.model.HiveDataModelGenerator. import-hive.sh command can be used to facilitate this. -Set the following configuration in <atlas-conf>/client.properties and set environment variable HIVE_CONFIG to the hive conf directory: +Set the following configuration in <atlas-conf>/client.properties and set environment variable $HIVE_CONF_DIR to the hive conf directory: <verbatim> <property> <name>atlas.cluster.name</name> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index bacabeb..37467b3 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -174,7 +174,7 @@ public class KafkaNotification extends AbstractNotification implements Service { } @Override - public void send(NotificationType type, String... messages) throws NotificationException { + public void _send(NotificationType type, String... messages) throws NotificationException { if (producer == null) { createProducer(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index f7bb7b1..72b5a0a 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -20,6 +20,9 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasException; import org.apache.commons.configuration.Configuration; +import java.util.Arrays; +import java.util.List; + /** * Abstract notification interface implementation. */ @@ -46,4 +49,20 @@ public abstract class AbstractNotification implements NotificationInterface { protected final boolean isEmbedded() { return embedded; } + + @Override + public <T> void send(NotificationType type, List<T> messages) throws NotificationException { + String[] strMessages = new String[messages.size()]; + for (int index = 0; index < messages.size(); index++) { + strMessages[index] = AbstractNotificationConsumer.GSON.toJson(messages.get(index)); + } + _send(type, strMessages); + } + + @Override + public <T> void send(NotificationType type, T... messages) throws NotificationException { + send(type, Arrays.asList(messages)); + } + + protected abstract void _send(NotificationType type, String[] messages) throws NotificationException; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java index 8c49d4a..42a4e7f 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java @@ -26,11 +26,16 @@ import com.google.gson.JsonDeserializationContext; import com.google.gson.JsonDeserializer; import com.google.gson.JsonElement; import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; import com.google.gson.reflect.TypeToken; import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.notification.entity.EntityNotificationImpl; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IStruct; +import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.codehaus.jettison.json.JSONArray; @@ -45,13 +50,15 @@ import java.util.Map; */ public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> { - private static final Gson GSON = new GsonBuilder(). + public static final Gson GSON = new GsonBuilder(). registerTypeAdapter(ImmutableList.class, new ImmutableListDeserializer()). registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()). registerTypeAdapter(EntityNotification.class, new EntityNotificationDeserializer()). registerTypeAdapter(IStruct.class, new StructDeserializer()). - registerTypeAdapter(IReferenceableInstance.class, new ReferenceableDeserializer()). - registerTypeAdapter(JSONArray.class, new JSONArrayDeserializer()). + registerTypeAdapter(IReferenceableInstance.class, new ReferenceableSerializerDeserializer()). + registerTypeAdapter(Referenceable.class, new ReferenceableSerializerDeserializer()). + registerTypeAdapter(JSONArray.class, new JSONArraySerializerDeserializer()). + registerTypeAdapter(HookNotification.HookNotificationMessage.class, new HookNotification()). create(); private final Class<T> type; @@ -136,30 +143,44 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon // ----- inner class : StructDeserializer ------------------------------- - public final static class StructDeserializer implements JsonDeserializer<IStruct> { + public final static class StructDeserializer implements JsonDeserializer<IStruct>, JsonSerializer<IStruct> { @Override public IStruct deserialize(final JsonElement json, final Type type, final JsonDeserializationContext context) throws JsonParseException { return context.deserialize(json, Struct.class); } + + @Override + public JsonElement serialize(IStruct src, Type typeOfSrc, JsonSerializationContext context) { + String instanceJson = InstanceSerialization.toJson(src, true); + return new JsonParser().parse(instanceJson).getAsJsonObject(); + } } - // ----- inner class : ReferenceableDeserializer ------------------------ + // ----- inner class : ReferenceableSerializerDeserializer ------------------------ - public final static class ReferenceableDeserializer implements JsonDeserializer<IStruct> { + public final static class ReferenceableSerializerDeserializer implements JsonDeserializer<IStruct>, + JsonSerializer<IReferenceableInstance> { @Override public IReferenceableInstance deserialize(final JsonElement json, final Type type, final JsonDeserializationContext context) throws JsonParseException { return InstanceSerialization.fromJsonReferenceable(json.toString(), true); } + + @Override + public JsonElement serialize(IReferenceableInstance src, Type typeOfSrc, JsonSerializationContext context) { + String instanceJson = InstanceSerialization.toJson(src, true); + return new JsonParser().parse(instanceJson).getAsJsonObject(); + } } - // ----- inner class : JSONArrayDeserializer ---------------------------- + // ----- inner class : JSONArraySerializerDeserializer ---------------------------- - public final static class JSONArrayDeserializer implements JsonDeserializer<JSONArray> { + public final static class JSONArraySerializerDeserializer implements JsonDeserializer<JSONArray>, + JsonSerializer<JSONArray> { @Override public JSONArray deserialize(final JsonElement json, final Type type, final JsonDeserializationContext context) throws JsonParseException { @@ -170,5 +191,10 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon throw new JsonParseException(e.getMessage(), e); } } + + @Override + public JsonElement serialize(JSONArray src, Type typeOfSrc, JsonSerializationContext context) { + return new JsonParser().parse(src.toString()).getAsJsonArray(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 1bee26f..6876758 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -23,9 +23,9 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; -import org.codehaus.jettison.json.JSONArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,12 +57,12 @@ public class NotificationHookConsumer implements Service { String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); atlasClient = new AtlasClient(atlasEndpoint); int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); - List<NotificationConsumer<JSONArray>> consumers = + List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers = notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); executors = Executors.newFixedThreadPool(consumers.size()); - for (final NotificationConsumer<JSONArray> consumer : consumers) { - executors.submit(new HookConsumer(consumer)); + for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) { + executors.submit(new HookConsumer(atlasClient, consumer)); } } @@ -86,15 +86,12 @@ public class NotificationHookConsumer implements Service { } class HookConsumer implements Runnable { - private final NotificationConsumer<JSONArray> consumer; + private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer; private final AtlasClient client; - public HookConsumer(NotificationConsumer<JSONArray> consumer) { - this(atlasClient, consumer); - } - - public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) { - this.client = client; + public HookConsumer(AtlasClient atlasClient, + NotificationConsumer<HookNotification.HookNotificationMessage> consumer) { + this.client = atlasClient; this.consumer = consumer; } @@ -106,14 +103,32 @@ public class NotificationHookConsumer implements Service { } while(consumer.hasNext()) { - JSONArray entityJson = consumer.next(); - LOG.info("Processing message {}", entityJson); + HookNotification.HookNotificationMessage message = consumer.next(); + try { - JSONArray guids = atlasClient.createEntity(entityJson); - LOG.info("Create entities with guid {}", guids); + switch (message.getType()) { + case ENTITY_CREATE: + HookNotification.EntityCreateRequest createRequest = + (HookNotification.EntityCreateRequest) message; + atlasClient.createEntity(createRequest.getEntities()); + break; + + case ENTITY_PARTIAL_UPDATE: + HookNotification.EntityPartialUpdateRequest partialUpdateRequest = + (HookNotification.EntityPartialUpdateRequest) message; + atlasClient.updateEntity(partialUpdateRequest.getTypeName(), + partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(), + partialUpdateRequest.getEntity()); + break; + + case ENTITY_FULL_UPDATE: + HookNotification.EntityUpdateRequest updateRequest = + (HookNotification.EntityUpdateRequest) message; + atlasClient.updateEntities(updateRequest.getEntities()); + break; + } } catch (Exception e) { - //todo handle failures - LOG.warn("Error handling message {}", entityJson, e); + LOG.debug("Error handling message {}", message, e); } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index 3e68998..e4c4fd6 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -18,7 +18,7 @@ package org.apache.atlas.notification; import org.apache.atlas.notification.entity.EntityNotification; -import org.codehaus.jettison.json.JSONArray; +import org.apache.atlas.notification.hook.HookNotification; import java.util.List; @@ -28,7 +28,7 @@ public interface NotificationInterface { String PROPERTY_PREFIX = "atlas.notification"; enum NotificationType { - HOOK(JSONArray.class), ENTITIES(EntityNotification.class); + HOOK(HookNotification.HookNotificationMessage.class), ENTITIES(EntityNotification.class); private final Class classType; @@ -52,7 +52,9 @@ public interface NotificationInterface { */ <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers); - void send(NotificationType type, String... messages) throws NotificationException; + <T> void send(NotificationType type, T... messages) throws NotificationException; + + <T> void send(NotificationType type, List<T> messages) throws NotificationException; void close(); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java index e2d16df..243f93e 100644 --- a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java +++ b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java @@ -17,12 +17,6 @@ package org.apache.atlas.notification.entity; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonParser; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; import org.apache.atlas.AtlasException; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.notification.NotificationInterface; @@ -30,10 +24,8 @@ import org.apache.atlas.typesystem.IReferenceableInstance; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.ITypedReferenceableInstance; import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.types.TypeSystem; -import java.lang.reflect.Type; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; @@ -44,9 +36,6 @@ import java.util.List; */ public class NotificationEntityChangeListener implements EntityChangeListener { - private static final Gson GSON = new GsonBuilder(). - registerTypeAdapter(Referenceable.class, new ReferencableSerializer()).create(); - private final NotificationInterface notificationInterface; private final TypeSystem typeSystem; @@ -93,7 +82,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener { // send notification of entity change private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions, EntityNotification.OperationType operationType) throws AtlasException { - List<String> messages = new LinkedList<>(); + List<EntityNotification> messages = new LinkedList<>(); for (IReferenceableInstance entityDefinition : entityDefinitions) { Referenceable entity = new Referenceable(entityDefinition); @@ -101,24 +90,9 @@ public class NotificationEntityChangeListener implements EntityChangeListener { EntityNotificationImpl notification = new EntityNotificationImpl(entity, operationType, typeSystem); - messages.add(GSON.toJson(notification)); + messages.add(notification); } - notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, - messages.toArray(new String[messages.size()])); - } - - - // ----- inner class : ReferencableSerializer --------------------------- - - private static class ReferencableSerializer implements JsonSerializer<Referenceable> { - - public static final JsonParser JSON_PARSER = new JsonParser(); - - @Override - public JsonElement serialize(Referenceable referenceable, Type type, - JsonSerializationContext jsonSerializationContext) { - return JSON_PARSER.parse(InstanceSerialization.toJson(referenceable, true)).getAsJsonObject(); - } + notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java new file mode 100644 index 0000000..568f58b --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.hook; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> { + + @Override + public HookNotificationMessage deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + if (json.isJsonArray()) { + JSONArray jsonArray = context.deserialize(json, JSONArray.class); + return new EntityCreateRequest(jsonArray); + } else { + HookNotificationType type = + context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class); + switch (type) { + case ENTITY_CREATE: + return context.deserialize(json, EntityCreateRequest.class); + + case ENTITY_FULL_UPDATE: + return context.deserialize(json, EntityUpdateRequest.class); + + case ENTITY_PARTIAL_UPDATE: + return context.deserialize(json, EntityPartialUpdateRequest.class); + + case TYPE_CREATE: + case TYPE_UPDATE: + return context.deserialize(json, TypeRequest.class); + } + throw new IllegalStateException("Unhandled type " + type); + } + } + + public enum HookNotificationType { + TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE + } + + public static class HookNotificationMessage { + protected HookNotificationType type; + + private HookNotificationMessage() { } + + public HookNotificationMessage(HookNotificationType type) { + this.type = type; + } + + public HookNotificationType getType() { + return type; + } + } + + public static class TypeRequest extends HookNotificationMessage { + private TypesDef typesDef; + + private TypeRequest() { } + + public TypeRequest(HookNotificationType type, TypesDef typesDef) { + super(type); + this.typesDef = typesDef; + } + + public TypesDef getTypesDef() { + return typesDef; + } + } + + public static class EntityCreateRequest extends HookNotificationMessage { + private List<Referenceable> entities; + + private EntityCreateRequest() { } + + public EntityCreateRequest(Referenceable... entities) { + super(HookNotificationType.ENTITY_CREATE); + this.entities = Arrays.asList(entities); + } + + protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) { + super(type); + this.entities = entities; + } + + public EntityCreateRequest(JSONArray jsonArray) { + super(HookNotificationType.ENTITY_CREATE); + entities = new ArrayList<>(); + for (int index = 0; index < jsonArray.length(); index++) { + try { + entities.add(InstanceSerialization.fromJsonReferenceable(jsonArray.getString(index), true)); + } catch (JSONException e) { + throw new JsonParseException(e); + } + } + } + + public List<Referenceable> getEntities() throws JSONException { + return entities; + } + } + + public static class EntityUpdateRequest extends EntityCreateRequest { + public EntityUpdateRequest(Referenceable... entities) { + this(Arrays.asList(entities)); + } + + public EntityUpdateRequest(List<Referenceable> entities) { + super(HookNotificationType.ENTITY_FULL_UPDATE, entities); + } + } + + public static class EntityPartialUpdateRequest extends HookNotificationMessage { + private String typeName; + private String attribute; + private Referenceable entity; + private String attributeValue; + + private EntityPartialUpdateRequest() { } + + public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue, + Referenceable entity) { + super(HookNotificationType.ENTITY_PARTIAL_UPDATE); + this.typeName = typeName; + this.attribute = attribute; + this.attributeValue = attributeValue; + this.entity = entity; + } + + public String getTypeName() { + return typeName; + } + + public String getAttribute() { + return attribute; + } + + public Referenceable getEntity() { + return entity; + } + + public String getAttributeValue() { + return attributeValue; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index 625a0b0..eb90f52 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -26,9 +26,7 @@ import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; import org.apache.commons.configuration.Configuration; -import org.apache.commons.lang.RandomStringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.codehaus.jettison.json.JSONArray; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; @@ -61,20 +59,6 @@ public class KafkaNotificationTest { } @Test - public void testSendReceiveMessage() throws Exception { - String msg1 = "[{\"message\": " + 123 + "}]"; - String msg2 = "[{\"message\": " + 456 + "}]"; - kafka.send(NotificationInterface.NotificationType.HOOK, msg1, msg2); - List<NotificationConsumer<JSONArray>> consumers = - kafka.createConsumers(NotificationInterface.NotificationType.HOOK, 1); - NotificationConsumer<JSONArray> consumer = consumers.get(0); - assertTrue(consumer.hasNext()); - assertEquals(new JSONArray(msg1), consumer.next()); - assertTrue(consumer.hasNext()); - assertEquals(new JSONArray(msg2), consumer.next()); - } - - @Test @SuppressWarnings("unchecked") public void testCreateConsumers() throws Exception { Configuration configuration = mock(Configuration.class); @@ -119,10 +103,6 @@ public class KafkaNotificationTest { assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)); } - private String random() { - return RandomStringUtils.randomAlphanumeric(5); - } - @AfterClass public void teardown() throws Exception { kafka.stop(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java new file mode 100644 index 0000000..4b9f81f --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.hook; + +import org.apache.atlas.notification.AbstractNotificationConsumer; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.codehaus.jettison.json.JSONArray; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class HookNotificationTest { + + @Test + public void testMessageBackwardCompatibility() throws Exception { + JSONArray jsonArray = new JSONArray(); + Referenceable entity = new Referenceable("sometype"); + entity.set("name", "somename"); + String entityJson = InstanceSerialization.toJson(entity, true); + jsonArray.put(entityJson); + + HookNotification.HookNotificationMessage notification = AbstractNotificationConsumer.GSON.fromJson( + jsonArray.toString(), HookNotification.HookNotificationMessage.class); + assertNotNull(notification); + assertEquals(notification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE); + HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) notification; + assertEquals(createRequest.getEntities().size(), 1); + assertEquals(createRequest.getEntities().get(0).getTypeName(), entity.getTypeName()); + } + + @Test + public void testNewMessageSerDe() throws Exception { + Referenceable entity1 = new Referenceable("sometype"); + entity1.set("attr", "value"); + entity1.set("complex", new Referenceable("othertype")); + Referenceable entity2 = new Referenceable("newtype"); + HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(entity1, entity2); + + String notificationJson = AbstractNotificationConsumer.GSON.toJson(request); + HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson( + notificationJson, HookNotification.HookNotificationMessage.class); + assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE); + HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification; + assertEquals(createRequest.getEntities().size(), 2); + Referenceable actualEntity1 = createRequest.getEntities().get(0); + assertEquals(actualEntity1.getTypeName(), "sometype"); + assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype"); + assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 1a324b7..3a368f3 100755 --- a/pom.xml +++ b/pom.xml @@ -354,7 +354,8 @@ <!-- skips checkstyle and find bugs --> <skipCheck>false</skipCheck> - <skipTests>false</skipTests> + <skipUTs>false</skipUTs> + <skipITs>false</skipITs> <skipDocs>true</skipDocs> <skipSite>true</skipSite> <projectBaseDir>${project.basedir}</projectBaseDir> @@ -1087,7 +1088,7 @@ <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> - <version>2.3.1</version> + <version>2.5</version> </dependency> <dependency> @@ -1394,6 +1395,7 @@ <configuration> <systemProperties> <user.dir>${project.basedir}</user.dir> + <atlas.data>${project.build.directory}/data</atlas.data> </systemProperties> <!--<skipTests>true</skipTests>--> <forkMode>always</forkMode> @@ -1403,6 +1405,7 @@ -Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml -Djava.net.preferIPv4Stack=true </argLine> + <skip>${skipUTs}</skip> <excludes> <exclude>**/*Base*</exclude> </excludes> @@ -1423,12 +1426,14 @@ <configuration> <systemPropertyVariables> <projectBaseDir>${projectBaseDir}</projectBaseDir> + <atlas.data>${project.build.directory}/data</atlas.data> </systemPropertyVariables> <redirectTestOutputToFile>true</redirectTestOutputToFile> <argLine>-Djava.awt.headless=true -Dproject.version=${project.version} -Dhadoop.tmp.dir="${project.build.directory}/tmp-hadoop-${user.name}" -Xmx1024m -XX:MaxPermSize=512m -Dlog4j.configuration=atlas-log4j.xml </argLine> + <skip>${skipITs}</skip> <parallel>none</parallel> <reuseForks>false</reuseForks> <forkCount>1</forkCount> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index f058148..64dc568 100644 --- a/release-log.txt +++ b/release-log.txt @@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags) ALL CHANGES: +ATLAS-386 Handle hive rename Table (shwethags) ATLAS-374 Doc: Create a wiki for documenting fault tolerance and HA options for Atlas data (yhemath via sumasai) ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown(yhemath via sumasai) ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index fb782a2..f605c26 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -304,7 +304,14 @@ public class DefaultMetadataService implements MetadataService { ParamChecker.notEmpty(entityTypeName, "Entity type cannot be null"); ClassType entityType = typeSystem.getDataType(ClassType.class, entityTypeName); + + //Both assigned id and values are required for full update + //classtype.convert() will remove values if id is assigned. So, set temp id, convert and + // then replace with original id + Id origId = entityInstance.getId(); + entityInstance.replaceWithNewId(new Id(entityInstance.getTypeName())); ITypedReferenceableInstance typedInstrance = entityType.convert(entityInstance, Multiplicity.REQUIRED); + ((ReferenceableInstance)typedInstrance).replaceWithNewId(origId); instances[index] = typedInstrance; } return instances; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/test/java/org/apache/atlas/TestUtils.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/TestUtils.java b/repository/src/test/java/org/apache/atlas/TestUtils.java index 12c47d4..2a45bc8 100755 --- a/repository/src/test/java/org/apache/atlas/TestUtils.java +++ b/repository/src/test/java/org/apache/atlas/TestUtils.java @@ -251,12 +251,12 @@ public final class TestUtils { new AttributeDefinition("columnsMap", DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), "column_type"), - Multiplicity.COLLECTION, true, null), + Multiplicity.OPTIONAL, true, null), //map of structs new AttributeDefinition("partitionsMap", DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), "partition_type"), - Multiplicity.COLLECTION, true, null), + Multiplicity.OPTIONAL, true, null), // struct reference new AttributeDefinition("serde1", "serdeType", Multiplicity.OPTIONAL, false, null), new AttributeDefinition("serde2", "serdeType", Multiplicity.OPTIONAL, false, null), http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java index 6fc7008..d824b50 100644 --- a/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/TitanGraphProviderTest.java @@ -19,12 +19,9 @@ package org.apache.atlas.repository.graph; import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.util.TitanCleanup; -import com.thinkaurelius.titan.diskstorage.Backend; -import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.commons.configuration.Configuration; -import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeTest; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java index 0352ef3..0307fd4 100644 --- a/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/service/DefaultMetadataServiceTest.java @@ -22,9 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.util.TitanCleanup; -import org.apache.atlas.typesystem.exception.TypeNotFoundException; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; -import org.apache.atlas.utils.ParamChecker; import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.TestUtils; import org.apache.atlas.repository.graph.GraphProvider; @@ -32,12 +29,15 @@ import org.apache.atlas.services.MetadataService; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.exception.EntityNotFoundException; +import org.apache.atlas.typesystem.exception.TypeNotFoundException; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.typesystem.types.EnumValue; import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.ValueConversionException; +import org.apache.atlas.utils.ParamChecker; import org.apache.commons.lang.RandomStringUtils; import org.codehaus.jettison.json.JSONArray; import org.testng.Assert; @@ -47,11 +47,15 @@ import org.testng.annotations.Guice; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + @Guice(modules = RepositoryMetadataModule.class) public class DefaultMetadataServiceTest { @Inject @@ -296,8 +300,8 @@ public class DefaultMetadataServiceTest { Map<String, Object> values = new HashMap<>(); values.put("name", "col1"); values.put("type", "type"); - Referenceable ref = new Referenceable("column_type", values); - columns.add(ref); + Referenceable col1 = new Referenceable("column_type", values); + columns.add(col1); Referenceable tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{ put("columns", columns); }}); @@ -307,19 +311,18 @@ public class DefaultMetadataServiceTest { metadataService.getEntityDefinition(TestUtils.TABLE_TYPE, "name", (String) table.get("name")); Referenceable tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); final List<Referenceable> arrClsColumns = (List) tableDefinition.get("columns"); - Assert.assertTrue(arrClsColumns.get(0).equalsContents(columns.get(0))); + assertReferenceables(arrClsColumns.get(0), columns.get(0)); //Partial update. Add col5 But also update col1 Map<String, Object> valuesCol5 = new HashMap<>(); valuesCol5.put("name", "col5"); valuesCol5.put("type", "type"); - ref = new Referenceable("column_type", valuesCol5); + Referenceable col2 = new Referenceable("column_type", valuesCol5); //update col1 - arrClsColumns.get(0).set("type", "type1"); + col1.set("type", "type1"); //add col5 - final List<Referenceable> updateColumns = new ArrayList<>(arrClsColumns); - updateColumns.add(ref); + final List<Referenceable> updateColumns = Arrays.asList(col1, col2); tableUpdated = new Referenceable(TestUtils.TABLE_TYPE, new HashMap<String, Object>() {{ put("columns", updateColumns); @@ -331,8 +334,8 @@ public class DefaultMetadataServiceTest { tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); List<Referenceable> arrColumnsList = (List) tableDefinition.get("columns"); Assert.assertEquals(arrColumnsList.size(), 2); - Assert.assertTrue(arrColumnsList.get(0).equalsContents(updateColumns.get(0))); - Assert.assertTrue(arrColumnsList.get(1).equalsContents(updateColumns.get(1))); + assertReferenceables(arrColumnsList.get(0), updateColumns.get(0)); + assertReferenceables(arrColumnsList.get(1), updateColumns.get(1)); //Complete update. Add array elements - col3,4 Map<String, Object> values1 = new HashMap<>(); @@ -355,9 +358,8 @@ public class DefaultMetadataServiceTest { tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); arrColumnsList = (List) tableDefinition.get("columns"); Assert.assertEquals(arrColumnsList.size(), columns.size()); - Assert.assertTrue(arrColumnsList.get(1).equalsContents(columns.get(1))); - Assert.assertTrue(arrColumnsList.get(2).equalsContents(columns.get(2))); - + assertReferenceables(arrColumnsList.get(1), columns.get(1)); + assertReferenceables(arrColumnsList.get(2), columns.get(2)); //Remove a class reference/Id and insert another reference //Also covers isComposite case since columns is a composite @@ -366,8 +368,8 @@ public class DefaultMetadataServiceTest { values.put("name", "col2"); values.put("type", "type"); - ref = new Referenceable("column_type", values); - columns.add(ref); + col1 = new Referenceable("column_type", values); + columns.add(col1); table.set("columns", columns); updateInstance(table); @@ -376,7 +378,7 @@ public class DefaultMetadataServiceTest { tableDefinition = InstanceSerialization.fromJsonReferenceable(tableDefinitionJson, true); arrColumnsList = (List) tableDefinition.get("columns"); Assert.assertEquals(arrColumnsList.size(), columns.size()); - Assert.assertTrue(arrColumnsList.get(0).equalsContents(columns.get(0))); + assertReferenceables(arrColumnsList.get(0), columns.get(0)); //Update array column to null table.setNull("columns"); @@ -389,6 +391,14 @@ public class DefaultMetadataServiceTest { Assert.assertNull(tableDefinition.get("columns")); } + private void assertReferenceables(Referenceable r1, Referenceable r2) { + assertEquals(r1.getTypeName(), r2.getTypeName()); + assertTrue(r1.getTraits().equals(r2.getTraits())); + for (String attr : r1.getValuesMap().keySet()) { + assertTrue(r1.getValuesMap().get(attr).equals(r2.getValuesMap().get(attr))); + } + //TODO assert trait instances and complex attributes + } @Test public void testStructs() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java index b8dcc7e..aa1736d 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java @@ -33,7 +33,7 @@ import java.util.Map; */ public class Referenceable extends Struct implements IReferenceableInstance { - private final Id id; + private Id id; private final ImmutableMap<String, IStruct> traits; private final ImmutableList<String> traitNames; @@ -151,6 +151,10 @@ public class Referenceable extends Struct implements IReferenceableInstance { '}'; } + public void replaceWithNewId(Id id) { + this.id = id; + } + private static Map<String, IStruct> getTraits(IReferenceableInstance instance) throws AtlasException { Map<String, IStruct> traits = new HashMap<>(); for (String traitName : instance.getTraits() ) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java index ac758fa..adf5f1c 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/ClassType.java @@ -125,9 +125,9 @@ public class ClassType extends HierarchicalType<ClassType, IReferenceableInstanc r != null ? createInstanceWithTraits(id, r, r.getTraits().toArray(new String[0])) : createInstance(id); -// if (id != null && id.isAssigned()) { -// return tr; -// } + if (id != null && id.isAssigned()) { + return tr; + } for (Map.Entry<String, AttributeInfo> e : fieldMapping.fields.entrySet()) { String attrKey = e.getKey(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java index 74d7f7c..a54dabc 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java @@ -25,8 +25,8 @@ public final class Multiplicity { public static final Multiplicity OPTIONAL = new Multiplicity(0, 1, false); public static final Multiplicity REQUIRED = new Multiplicity(1, 1, false); - public static final Multiplicity COLLECTION = new Multiplicity(0, Integer.MAX_VALUE, false); - public static final Multiplicity SET = new Multiplicity(0, Integer.MAX_VALUE, true); + public static final Multiplicity COLLECTION = new Multiplicity(1, Integer.MAX_VALUE, false); + public static final Multiplicity SET = new Multiplicity(1, Integer.MAX_VALUE, true); public final int lower; public final int upper; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/typesystem/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties index 4a351e6..d475d7e 100644 --- a/typesystem/src/main/resources/application.properties +++ b/typesystem/src/main/resources/application.properties @@ -27,7 +27,7 @@ atlas.graph.storage.backend=${titan.storage.backend} atlas.graph.index.search.backend=${titan.index.backend} #Berkeley storage directory -atlas.graph.storage.directory=target/data/berkley +atlas.graph.storage.directory=${sys:atlas.data}/berkley #hbase #For standalone mode , specify localhost @@ -38,7 +38,7 @@ atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.lock.wait-time=10000 #ElasticSearch -atlas.graph.index.search.directory=target/data/es +atlas.graph.index.search.directory=${sys:atlas.data}/es atlas.graph.index.search.elasticsearch.client-only=false atlas.graph.index.search.elasticsearch.local-mode=true atlas.graph.index.search.elasticsearch.create.sleep=2000 @@ -63,7 +63,7 @@ atlas.notification.embedded=true atlas.kafka.zookeeper.connect=localhost:19026 atlas.kafka.bootstrap.servers=localhost:19027 -atlas.kafka.data=target/data/kafka +atlas.kafka.data=${sys:atlas.data}/kafka atlas.kafka.zookeeper.session.timeout.ms=400 atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.auto.commit.interval.ms=100 http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/pom.xml ---------------------------------------------------------------------- diff --git a/webapp/pom.xml b/webapp/pom.xml index 85a5f94..d893947 100755 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -372,6 +372,10 @@ <name>atlas.home</name> <value>${project.build.directory}</value> </systemProperty> + <systemProperty> + <name>atlas.data</name> + <value>${project.build.directory}/data</value> + </systemProperty> </systemProperties> <stopKey>atlas-stop</stopKey> <stopPort>31001</stopPort> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/src/main/java/org/apache/atlas/Atlas.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/Atlas.java b/webapp/src/main/java/org/apache/atlas/Atlas.java index 5ae1e44..2d2d619 100755 --- a/webapp/src/main/java/org/apache/atlas/Atlas.java +++ b/webapp/src/main/java/org/apache/atlas/Atlas.java @@ -40,6 +40,7 @@ public final class Atlas { private static final String APP_PATH = "app"; private static final String APP_PORT = "port"; private static final String ATLAS_HOME = "atlas.home"; + private static final String ATLAS_DATA = "atlas.data"; private static final String ATLAS_LOG_DIR = "atlas.log.dir"; public static final String ATLAS_SERVER_HTTPS_PORT = "atlas.server.https.port"; public static final String ATLAS_SERVER_HTTP_PORT = "atlas.server.http.port"; @@ -110,6 +111,9 @@ public final class Atlas { if (System.getProperty(ATLAS_HOME) == null) { System.setProperty(ATLAS_HOME, "target"); } + if (System.getProperty(ATLAS_DATA) == null) { + System.setProperty(ATLAS_DATA, "target/data"); + } if (System.getProperty(ATLAS_LOG_DIR) == null) { System.setProperty(ATLAS_LOG_DIR, "target/logs"); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java index e5af26c..3a4661c 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java @@ -19,22 +19,22 @@ package org.apache.atlas.notification; import com.google.inject.Inject; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.web.resources.BaseResourceIT; import org.codehaus.jettison.json.JSONArray; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; + @Guice(modules = NotificationModule.class) public class NotificationHookConsumerIT extends BaseResourceIT { @Inject private NotificationInterface kafka; - private String dbName; @BeforeClass public void setUp() throws Exception { @@ -47,57 +47,106 @@ public class NotificationHookConsumerIT extends BaseResourceIT { kafka.close(); } - private void sendHookMessage(Referenceable entity) throws NotificationException { - String entityJson = InstanceSerialization.toJson(entity, true); - JSONArray jsonArray = new JSONArray(); - jsonArray.put(entityJson); - kafka.send(NotificationInterface.NotificationType.HOOK, jsonArray.toString()); + private void sendHookMessage(HookNotification.HookNotificationMessage message) throws NotificationException { + kafka.send(NotificationInterface.NotificationType.HOOK, message); } @Test - public void testConsumeHookMessage() throws Exception { - Referenceable entity = new Referenceable(DATABASE_TYPE); - dbName = "db" + randomString(); - entity.set("name", dbName); + public void testCreateEntity() throws Exception { + final Referenceable entity = new Referenceable(DATABASE_TYPE); + entity.set("name", "db" + randomString()); entity.set("description", randomString()); - sendHookMessage(entity); + sendHookMessage(new HookNotification.EntityCreateRequest(entity)); waitFor(1000, new Predicate() { @Override public boolean evaluate() throws Exception { - JSONArray results = - serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); + JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, + entity.get("name"))); return results.length() == 1; } }); } - @Test (dependsOnMethods = "testConsumeHookMessage") - public void testEnityDeduping() throws Exception { -// Referenceable db = serviceClient.getEntity(DATABASE_TYPE, "name", dbName); - Referenceable db = new Referenceable(DATABASE_TYPE); - db.set("name", dbName); - db.set("description", randomString()); + @Test + public void testUpdateEntityPartial() throws Exception { + final Referenceable entity = new Referenceable(DATABASE_TYPE); + final String dbName = "db" + randomString(); + entity.set("name", dbName); + entity.set("description", randomString()); + serviceClient.createEntity(entity); - Referenceable table = new Referenceable(HIVE_TABLE_TYPE); - final String tableName = randomString(); - table.set("name", tableName); - table.set("db", db); + final Referenceable newEntity = new Referenceable(DATABASE_TYPE); + newEntity.set("owner", randomString()); + sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity)); + waitFor(1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + Referenceable localEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName); + return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner"))); + } + }); + + //Its partial update and un-set fields are not updated + Referenceable actualEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName); + assertEquals(actualEntity.get("description"), entity.get("description")); + } + + @Test + public void testUpdatePartialUpdatingQualifiedName() throws Exception { + final Referenceable entity = new Referenceable(DATABASE_TYPE); + final String dbName = "db" + randomString(); + entity.set("name", dbName); + entity.set("description", randomString()); + serviceClient.createEntity(entity); - sendHookMessage(table); + final Referenceable newEntity = new Referenceable(DATABASE_TYPE); + final String newName = "db" + randomString(); + newEntity.set("name", newName); + + sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name", dbName, newEntity)); waitFor(1000, new Predicate() { @Override public boolean evaluate() throws Exception { - JSONArray results = - serviceClient.searchByDSL(String.format("%s where name='%s'", HIVE_TABLE_TYPE, tableName)); + JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, + newName)); return results.length() == 1; } }); - JSONArray results = - serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); - Assert.assertEquals(results.length(), 1); + //no entity with the old qualified name + JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); + assertEquals(results.length(), 0); + } + @Test + public void testUpdateEntityFullUpdate() throws Exception { + Referenceable entity = new Referenceable(DATABASE_TYPE); + final String dbName = "db" + randomString(); + entity.set("name", dbName); + entity.set("description", randomString()); + serviceClient.createEntity(entity); + + final Referenceable newEntity = new Referenceable(DATABASE_TYPE); + newEntity.set("name", dbName); + newEntity.set("description", randomString()); + newEntity.set("owner", randomString()); + + //updating unique attribute + sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity)); + waitFor(1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, + dbName)); + return results.length() == 1; + } + }); + + Referenceable actualEntity = serviceClient.getEntity(DATABASE_TYPE, "name", dbName); + assertEquals(actualEntity.get("description"), newEntity.get("description")); + assertEquals(actualEntity.get("owner"), newEntity.get("owner")); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/dad90970/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java index 7337eaf..f476af3 100755 --- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java @@ -58,6 +58,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import static org.testng.Assert.assertEquals; + /** * Integration tests for Entity Jersey Resource. */ @@ -95,6 +97,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT { } @Test + //API should accept single entity (or jsonarray of entities) public void testSubmitSingleEntity() throws Exception { Referenceable databaseInstance = new Referenceable(DATABASE_TYPE); databaseInstance.set("name", randomString()); @@ -115,6 +118,34 @@ public class EntityJerseyResourceIT extends BaseResourceIT { } @Test + public void testEntityDeduping() throws Exception { + Referenceable db = new Referenceable(DATABASE_TYPE); + String dbName = "db" + randomString(); + db.set("name", dbName); + db.set("description", randomString()); + + serviceClient.createEntity(db); + JSONArray results = + serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); + assertEquals(results.length(), 1); + + //create entity again shouldn't create another instance with same unique attribute value + serviceClient.createEntity(db); + results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); + assertEquals(results.length(), 1); + + //Test the same across references + Referenceable table = new Referenceable(HIVE_TABLE_TYPE); + final String tableName = randomString(); + table.set("name", tableName); + table.set("db", db); + + serviceClient.createEntity(table); + results = serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE, dbName)); + assertEquals(results.length(), 1); + } + + @Test public void testEntityDefinitionAcrossTypeUpdate() throws Exception { //create type HierarchicalTypeDefinition<ClassType> typeDefinition = TypesUtil
