Repository: incubator-atlas Updated Branches: refs/heads/master c93e0972a -> 6f421e997
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java index 213e46c..6102427 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/Referenceable.java @@ -20,9 +20,12 @@ package org.apache.atlas.typesystem; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.atlas.AtlasException; import org.apache.atlas.classification.InterfaceAudience; import org.apache.atlas.typesystem.persistence.Id; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -75,6 +78,27 @@ public class Referenceable extends Struct implements IReferenceableInstance { traits = ImmutableMap.copyOf(_traits); } + /** + * Construct a Referenceable from the given ITypedReferenceableInstance. + * + * @param instance the typed referenceable instance to copy + * + * @throws AtlasException if the referenceable can not be created + */ + public Referenceable(ITypedReferenceableInstance instance) throws AtlasException { + this(instance.getId()._getId(), instance.getTypeName(), instance.getValuesMap(), instance.getTraits(), + getTraits(instance)); + } + + /** + * No-arg constructor for serialization. + */ + @SuppressWarnings("unused") + private Referenceable() { + this("", "", Collections.<String, Object>emptyMap(), Collections.<String>emptyList(), + Collections.<String, IStruct>emptyMap()); + } + @Override public ImmutableList<String> getTraits() { return traitNames; @@ -89,4 +113,13 @@ public class Referenceable extends Struct implements IReferenceableInstance { public IStruct getTrait(String typeName) { return traits.get(typeName); } + + private static Map<String, IStruct> getTraits(ITypedReferenceableInstance instance) { + Map<String, IStruct> traits = new HashMap<>(); + + for (String traitName : instance.getTraits() ) { + traits.put(traitName, instance.getTrait(traitName)); + } + return traits; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java b/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java index d57fbe0..d03e2c2 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/Struct.java @@ -20,6 +20,7 @@ package org.apache.atlas.typesystem; import org.apache.atlas.classification.InterfaceAudience; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -41,6 +42,15 @@ public class Struct implements IStruct { } } + /** + * No-arg constructor for serialization. + */ + @SuppressWarnings("unused") + private Struct() { + this("", Collections.<String, Object>emptyMap()); + } + + @Override public String getTypeName() { return typeName; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/typesystem/src/main/resources/application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties index 341acec..8d66dc3 100644 --- a/typesystem/src/main/resources/application.properties +++ b/typesystem/src/main/resources/application.properties @@ -61,6 +61,8 @@ atlas.kafka.data=target/data/kafka atlas.kafka.zookeeper.session.timeout.ms=400 atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.auto.commit.interval.ms=100 +atlas.kafka.hook.group.id=atlas +atlas.kafka.entities.group.id=atlas_entities ######### Security Properties ######### http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java index fd05d28..f9c0cbb 100755 --- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java +++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java @@ -33,9 +33,13 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.notification.entity.NotificationEntityChangeListener; import org.apache.atlas.repository.graph.GraphProvider; import org.apache.atlas.service.Services; +import org.apache.atlas.services.MetadataService; +import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.web.filters.AtlasAuthenticationFilter; import org.apache.atlas.web.filters.AuditFilter; import org.apache.commons.configuration.Configuration; @@ -114,6 +118,7 @@ public class GuiceServletConfig extends GuiceServletContextListener { LoginProcessor loginProcessor = new LoginProcessor(); loginProcessor.login(); + initMetadataService(); startServices(); } @@ -154,4 +159,17 @@ public class GuiceServletConfig extends GuiceServletContextListener { Services services = injector.getInstance(Services.class); services.stop(); } + + // initialize the metadata service + private void initMetadataService() { + MetadataService metadataService = injector.getInstance(MetadataService.class); + + // add a listener for entity changes + NotificationInterface notificationInterface = injector.getInstance(NotificationInterface.class); + + NotificationEntityChangeListener listener = + new NotificationEntityChangeListener(notificationInterface, TypeSystem.getInstance()); + + metadataService.registerListener(listener); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/6f421e99/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java new file mode 100644 index 0000000..204b95a --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import com.google.inject.Inject; +import org.apache.atlas.notification.entity.EntityNotification; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.web.resources.BaseResourceIT; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.util.List; + +/** + * Entity Notification Integration Tests. + */ +@Guice(modules = NotificationModule.class) +public class EntityNotificationIT extends BaseResourceIT { + + @Inject + private NotificationInterface notificationInterface; + + @BeforeClass + public void setUp() throws Exception { + super.setUp(); + createTypeDefinitions(); + } + + @Test + public void testEntityNotification() throws Exception { + + List<NotificationConsumer<EntityNotification>> consumers = + notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); + + NotificationConsumer<EntityNotification> consumer = consumers.iterator().next(); + final EntityNotificationConsumer notificationConsumer = new EntityNotificationConsumer(consumer); + Thread thread = new Thread(notificationConsumer); + thread.start(); + + createEntity("Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales"); + + waitFor(10000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return notificationConsumer.entityNotification != null; + } + }); + + Assert.assertNotNull(notificationConsumer.entityNotification); + Assert.assertEquals(EntityNotification.OperationType.ENTITY_CREATE, notificationConsumer.entityNotification.getOperationType()); + Assert.assertEquals(DATABASE_TYPE, notificationConsumer.entityNotification.getEntity().getTypeName()); + Assert.assertEquals("Sales", notificationConsumer.entityNotification.getEntity().get("name")); + } + + private void createEntity(String name, String description, String owner, String locationUri, String... traitNames) + throws Exception { + + Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("description", description); + referenceable.set("owner", owner); + referenceable.set("locationUri", locationUri); + referenceable.set("createTime", System.currentTimeMillis()); + + createInstance(referenceable); + } + + private static class EntityNotificationConsumer implements Runnable { + private final NotificationConsumer<EntityNotification> consumerIterator; + private EntityNotification entityNotification = null; + + public EntityNotificationConsumer(NotificationConsumer<EntityNotification> consumerIterator) { + this.consumerIterator = consumerIterator; + } + + @Override + public void run() { + while(consumerIterator.hasNext()) { + entityNotification = consumerIterator.next(); + } + } + } +}
