ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/8bde666b Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/8bde666b Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/8bde666b Branch: refs/heads/master Commit: 8bde666ba1986f5b6c9e20cad82d6037a6739db9 Parents: bca454e Author: Shwetha GS <[email protected]> Authored: Fri Apr 1 11:08:39 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Fri Apr 1 11:08:39 2016 +0530 ---------------------------------------------------------------------- addons/falcon-bridge/pom.xml | 5 + addons/hive-bridge/pom.xml | 5 + addons/sqoop-bridge/pom.xml | 5 + addons/storm-bridge/pom.xml | 5 + .../main/java/org/apache/atlas/AtlasClient.java | 6 + .../java/org/apache/atlas/AtlasClientTest.java | 30 ++ .../java/org/apache/atlas/AtlasConstants.java | 2 + distro/src/conf/atlas-application.properties | 11 +- .../apache/atlas/kafka/KafkaNotification.java | 4 + .../notification/AbstractNotification.java | 7 + .../notification/NotificationHookConsumer.java | 100 ++++- .../NotificationHookConsumerTest.java | 95 ++++- release-log.txt | 1 + .../audit/HBaseBasedAuditRepository.java | 39 +- .../graph/GraphBackedSearchIndexer.java | 41 ++- .../atlas/services/DefaultMetadataService.java | 62 +++- .../audit/HBaseBasedAuditRepositoryHATest.java | 94 +++++ .../graph/GraphBackedSearchIndexerTest.java | 94 +++++ .../DefaultMetadataServiceMockTest.java | 104 +++++- server-api/pom.xml | 13 + .../org/apache/atlas/ha/HAConfiguration.java | 196 ++++++++++ .../listener/ActiveStateChangeHandler.java | 49 +++ .../apache/atlas/ha/HAConfigurationTest.java | 90 +++++ .../atlas/typesystem/types/TypeSystem.java | 10 +- .../main/resources/atlas-application.properties | 6 +- .../atlas/typesystem/types/TypeSystemTest.java | 47 ++- .../src/main/java/org/apache/atlas/Atlas.java | 8 +- .../atlas/web/filters/ActiveServerFilter.java | 139 +++++++ .../atlas/web/listeners/GuiceServletConfig.java | 47 ++- .../service/ActiveInstanceElectorModule.java | 49 +++ .../service/ActiveInstanceElectorService.java | 197 ++++++++++ .../atlas/web/service/ActiveInstanceState.java | 109 ++++++ .../atlas/web/service/CuratorFactory.java | 94 +++++ .../apache/atlas/web/service/ServiceState.java | 96 +++++ .../web/filters/ActiveServerFilterTest.java | 172 +++++++++ .../ActiveInstanceElectorServiceTest.java | 364 +++++++++++++++++++ .../web/service/ActiveInstanceStateTest.java | 137 +++++++ .../atlas/web/service/ServiceStateTest.java | 67 ++++ 38 files changed, 2530 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/addons/falcon-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml index ad345c5..afbc150 100644 --- a/addons/falcon-bridge/pom.xml +++ b/addons/falcon-bridge/pom.xml @@ -151,6 +151,11 @@ <version>${project.version}</version> </artifactItem> <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-server-api</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/addons/hive-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index 8bfbb13..720b6d1 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -229,6 +229,11 @@ <version>${project.version}</version> </artifactItem> <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-server-api</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/addons/sqoop-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml index 343bb4e..4b5dbb1 100644 --- a/addons/sqoop-bridge/pom.xml +++ b/addons/sqoop-bridge/pom.xml @@ -234,6 +234,11 @@ <version>${project.version}</version> </artifactItem> <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-server-api</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/addons/storm-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml index e3b4ed7..9efa568 100644 --- a/addons/storm-bridge/pom.xml +++ b/addons/storm-bridge/pom.xml @@ -184,6 +184,11 @@ <version>${project.version}</version> </artifactItem> <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>atlas-server-api</artifactId> + <version>${project.version}</version> + </artifactItem> + <artifactItem> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index 1fc811a..18c0569 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -145,6 +145,12 @@ public class AtlasClient { return true; } catch (ClientHandlerException che) { return false; + } catch (AtlasServiceException ase) { + if (ase.getStatus().equals(ClientResponse.Status.SERVICE_UNAVAILABLE)) { + LOG.warn("Received SERVICE_UNAVAILABLE, server is not yet ready"); + return false; + } + throw ase; } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/client/src/test/java/org/apache/atlas/AtlasClientTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java index 1e7eed1..6e1fbe2 100644 --- a/client/src/test/java/org/apache/atlas/AtlasClientTest.java +++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.fail; public class AtlasClientTest { @@ -64,4 +65,33 @@ public class AtlasClientTest { new ClientHandlerException()); assertFalse(atlasClient.isServerReady()); } + + @Test + public void shouldReturnFalseIfServiceIsUnavailable() throws AtlasServiceException { + WebResource webResource = mock(WebResource.class); + AtlasClient atlasClient = new AtlasClient(webResource); + WebResource.Builder builder = setupBuilder(webResource); + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.SERVICE_UNAVAILABLE.getStatusCode()); + when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.SERVICE_UNAVAILABLE); + + when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response); + + assertFalse(atlasClient.isServerReady()); + } + + @Test(expectedExceptions = AtlasServiceException.class) + public void shouldThrowErrorIfAnyResponseOtherThanServiceUnavailable() throws AtlasServiceException { + WebResource webResource = mock(WebResource.class); + AtlasClient atlasClient = new AtlasClient(webResource); + WebResource.Builder builder = setupBuilder(webResource); + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); + when(response.getClientResponseStatus()).thenReturn(ClientResponse.Status.INTERNAL_SERVER_ERROR); + + when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response); + + atlasClient.isServerReady(); + fail("Should throw exception"); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/common/src/main/java/org/apache/atlas/AtlasConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/AtlasConstants.java b/common/src/main/java/org/apache/atlas/AtlasConstants.java index 85719c9..950ed6b 100644 --- a/common/src/main/java/org/apache/atlas/AtlasConstants.java +++ b/common/src/main/java/org/apache/atlas/AtlasConstants.java @@ -28,4 +28,6 @@ public final class AtlasConstants { public static final String CLUSTER_NAME_KEY = "atlas.cluster.name"; public static final String DEFAULT_CLUSTER_NAME = "primary"; public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName"; + public static final String SYSTEM_PROPERTY_APP_PORT = "atlas.app.port"; + public static final String DEFAULT_APP_PORT_STR = "21000"; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index 453435b..00c5d5a 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -95,4 +95,13 @@ atlas.http.authentication.enabled=false atlas.http.authentication.type=simple ######### Server Properties ######### -atlas.rest.address=http://localhost:21000 \ No newline at end of file +atlas.rest.address=http://localhost:21000 + +######### High Availability Configuration ######## +atlas.server.ha.enabled=false +atlas.server.ids=id1 +atlas.server.address.id1=localhost:21000 +atlas.server.ha.zookeeper.connect=localhost:2181 +atlas.server.ha.zookeeper.retry.sleeptime.ms=1000 +atlas.server.ha.zookeeper.num.retries=3 +atlas.server.ha.zookeeper.session.timeout.ms=20000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 2701039..889af11 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -127,6 +127,10 @@ public class KafkaNotification extends AbstractNotification implements Service { @Override public void start() throws AtlasException { + if (isHAEnabled()) { + LOG.info("Not starting embedded instances when HA is enabled."); + return; + } if (isEmbedded()) { try { startZk(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index 885242d..596f988 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -18,6 +18,7 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; import org.apache.commons.configuration.Configuration; import java.util.Arrays; @@ -30,12 +31,14 @@ public abstract class AbstractNotification implements NotificationInterface { private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded"; private final boolean embedded; + private final boolean isHAEnabled; // ----- Constructors ------------------------------------------------------ public AbstractNotification(Configuration applicationProperties) throws AtlasException { this.embedded = applicationProperties.getBoolean(PROPERTY_EMBEDDED, false); + this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties); } @@ -50,6 +53,10 @@ public abstract class AbstractNotification implements NotificationInterface { return embedded; } + protected final boolean isHAEnabled() { + return isHAEnabled; + } + @Override public <T> void send(NotificationType type, List<T> messages) throws NotificationException { String[] strMessages = new String[messages.size()]; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 2fcbcd3..ca53fd2 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -23,6 +23,8 @@ import kafka.consumer.ConsumerTimeoutException; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; @@ -30,39 +32,69 @@ import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Consumer of notifications from hooks e.g., hive hook etc. */ @Singleton -public class NotificationHookConsumer implements Service { +public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; - @Inject private NotificationInterface notificationInterface; private ExecutorService executors; private String atlasEndpoint; + private Configuration applicationProperties; + private List<HookConsumer> consumers; + + @Inject + public NotificationHookConsumer(NotificationInterface notificationInterface) { + this.notificationInterface = notificationInterface; + } @Override public void start() throws AtlasException { - Configuration applicationProperties = ApplicationProperties.get(); + Configuration configuration = ApplicationProperties.get(); + startInternal(configuration, null); + } - atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); + void startInternal(Configuration configuration, + ExecutorService executorService) { + this.applicationProperties = configuration; + this.atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); + if (consumers == null) { + consumers = new ArrayList<>(); + } + if (executorService != null) { + executors = executorService; + } + if (!HAConfiguration.isHAEnabled(configuration)) { + LOG.info("HA is disabled, starting consumers inline."); + startConsumers(executorService); + } + } + + private void startConsumers(ExecutorService executorService) { int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); - List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers = + List<NotificationConsumer<HookNotification.HookNotificationMessage>> notificationConsumers = notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); - executors = Executors.newFixedThreadPool(consumers.size()); - - for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : consumers) { - executors.submit(new HookConsumer(consumer)); + if (executorService == null) { + executorService = Executors.newFixedThreadPool(notificationConsumers.size()); + } + executors = executorService; + for (final NotificationConsumer<HookNotification.HookNotificationMessage> consumer : notificationConsumers) { + HookConsumer hookConsumer = new HookConsumer(consumer); + consumers.add(hookConsumer); + executors.submit(hookConsumer); } } @@ -71,14 +103,52 @@ public class NotificationHookConsumer implements Service { //Allow for completion of outstanding work notificationInterface.close(); try { - if (executors != null && !executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { - LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + if (executors != null) { + stopConsumerThreads(); + executors.shutdownNow(); + if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + } + executors = null; } } catch (InterruptedException e) { LOG.error("Failure in shutting down consumers"); } } + private void stopConsumerThreads() { + if (consumers != null) { + for (HookConsumer consumer : consumers) { + consumer.stop(); + } + consumers.clear(); + } + } + + /** + * Start Kafka consumer threads that read from Kafka topic when server is activated. + * + * Since the consumers create / update entities to the shared backend store, only the active instance + * should perform this activity. Hence, these threads are started only on server activation. + */ + @Override + public void instanceIsActive() { + LOG.info("Reacting to active state: initializing Kafka consumers"); + startConsumers(executors); + } + + /** + * Stop Kafka consumer threads that read from Kafka topic when server is de-activated. + * + * Since the consumers create / update entities to the shared backend store, only the active instance + * should perform this activity. Hence, these threads are stopped only on server deactivation. + */ + @Override + public void instanceIsPassive() { + LOG.info("Reacting to passive state: shutting down Kafka consumers."); + stop(); + } + static class Timer { public void sleep(int interval) throws InterruptedException { Thread.sleep(interval); @@ -87,6 +157,7 @@ public class NotificationHookConsumer implements Service { class HookConsumer implements Runnable { private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer; + private final AtomicBoolean shouldRun = new AtomicBoolean(false); public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) { this.consumer = consumer; @@ -102,12 +173,13 @@ public class NotificationHookConsumer implements Service { @Override public void run() { + shouldRun.set(true); if (!serverAvailable(new NotificationHookConsumer.Timer())) { return; } - while (true) { + while (shouldRun.get()) { try { if (hasNext()) { HookNotification.HookNotificationMessage message = consumer.next(); @@ -177,5 +249,9 @@ public class NotificationHookConsumer implements Service { LOG.info("Atlas Server is ready, can start reading Kafka events."); return true; } + + public void stop() { + shouldRun.set(false); + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index 02255a7..177de6d 100644 --- a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -20,18 +20,43 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasServiceException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; + import static org.mockito.Mockito.*; import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; public class NotificationHookConsumerTest { + @Mock + private NotificationInterface notificationInterface; + + @Mock + private AtlasClient atlasClient; + + @Mock + private Configuration configuration; + + @Mock + private ExecutorService executorService; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + @Test public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException { - final AtlasClient atlasClient = mock(AtlasClient.class); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { @Override @@ -49,8 +74,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException { - final AtlasClient atlasClient = mock(AtlasClient.class); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { @Override @@ -68,8 +92,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException { - final AtlasClient atlasClient = mock(AtlasClient.class); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { @Override @@ -86,8 +109,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException { - final AtlasClient atlasClient = mock(AtlasClient.class); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { @Override @@ -101,4 +123,61 @@ public class NotificationHookConsumerTest { assertFalse(hookConsumer.serverAvailable(timer)); } + + @Test + public void testConsumersStartedIfHAIsDisabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); + List<NotificationConsumer<Object>> consumers = new ArrayList(); + consumers.add(mock(NotificationConsumer.class)); + when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). + thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); + notificationHookConsumer.startInternal(configuration, executorService); + verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); + verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); + } + + @Test + public void testConsumersAreNotStartedIfHAIsEnabled() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); + List<NotificationConsumer<Object>> consumers = new ArrayList(); + consumers.add(mock(NotificationConsumer.class)); + when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). + thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); + notificationHookConsumer.startInternal(configuration, executorService); + verifyZeroInteractions(notificationInterface); + } + + @Test + public void testConsumersAreStartedWhenInstanceBecomesActive() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); + List<NotificationConsumer<Object>> consumers = new ArrayList(); + consumers.add(mock(NotificationConsumer.class)); + when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). + thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); + notificationHookConsumer.startInternal(configuration, executorService); + notificationHookConsumer.instanceIsActive(); + verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); + verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); + } + + @Test + public void testConsumersAreStoppedWhenInstanceBecomesPassive() { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); + List<NotificationConsumer<Object>> consumers = new ArrayList(); + consumers.add(mock(NotificationConsumer.class)); + when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). + thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface); + notificationHookConsumer.startInternal(configuration, executorService); + notificationHookConsumer.instanceIsPassive(); + verify(notificationInterface).close(); + verify(executorService).shutdownNow(); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index aaef9e3..87e39e6 100644 --- a/release-log.txt +++ b/release-log.txt @@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-511 Ability to run multiple instances of Atlas Server with automatic failover to one active server (yhemanth via shwethags) ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags) ATLAS-588 import-hive.sh fails while importing partitions for a non-partitioned table (sumasai via shwethags) ATLAS-575 jetty-maven-plugin fails with ShutdownMonitorThread already started (shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index ae6e988..c4329a5 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -18,8 +18,12 @@ package org.apache.atlas.repository.audit; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Singleton; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; @@ -59,7 +63,8 @@ import java.util.List; * and only 1 version is kept, there can be just 1 audit event per entity id + timestamp. This is ok for one atlas server. * But if there are more than one atlas servers, we should use server id in the key */ -public class HBaseBasedAuditRepository implements Service, EntityAuditRepository { +@Singleton +public class HBaseBasedAuditRepository implements Service, EntityAuditRepository, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(HBaseBasedAuditRepository.class); public static final String CONFIG_PREFIX = "atlas.audit"; @@ -237,23 +242,47 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository @Override public void start() throws AtlasException { - Configuration atlasConf = ApplicationProperties.get(); + Configuration configuration = ApplicationProperties.get(); + startInternal(configuration, getHBaseConfiguration(configuration)); + } + + @VisibleForTesting + void startInternal(Configuration atlasConf, + org.apache.hadoop.conf.Configuration hbaseConf) throws AtlasException { String tableNameStr = atlasConf.getString(CONFIG_TABLE_NAME, DEFAULT_TABLE_NAME); tableName = TableName.valueOf(tableNameStr); try { - org.apache.hadoop.conf.Configuration hbaseConf = getHBaseConfiguration(atlasConf); - connection = ConnectionFactory.createConnection(hbaseConf); + connection = createConnection(hbaseConf); } catch (IOException e) { throw new AtlasException(e); } - createTableIfNotExists(); + if (!HAConfiguration.isHAEnabled(atlasConf)) { + LOG.info("HA is disabled. Hence creating table on startup."); + createTableIfNotExists(); + } + } + + @VisibleForTesting + protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) throws IOException { + return ConnectionFactory.createConnection(hbaseConf); } @Override public void stop() throws AtlasException { close(connection); } + + @Override + public void instanceIsActive() throws AtlasException { + LOG.info("Reacting to active: Creating HBase table for Audit if required."); + createTableIfNotExists(); + } + + @Override + public void instanceIsPassive() { + LOG.info("Reacting to passive: No action for now."); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 7eccc58..e7e8fb9 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -26,12 +26,15 @@ import com.thinkaurelius.titan.core.schema.TitanGraphIndex; import com.thinkaurelius.titan.core.schema.TitanManagement; import com.tinkerpop.blueprints.Edge; import com.tinkerpop.blueprints.Vertex; +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.discovery.SearchIndexer; +import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.IndexCreationException; import org.apache.atlas.repository.IndexException; import org.apache.atlas.repository.RepositoryException; +import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.typesystem.types.AttributeInfo; import org.apache.atlas.typesystem.types.ClassType; import org.apache.atlas.typesystem.types.DataTypes; @@ -39,6 +42,7 @@ import org.apache.atlas.typesystem.types.IDataType; import org.apache.atlas.typesystem.types.Multiplicity; import org.apache.atlas.typesystem.types.StructType; import org.apache.atlas.typesystem.types.TraitType; +import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +57,7 @@ import java.util.Map; /** * Adds index for properties of a given type when its added before any instances are added. */ -public class GraphBackedSearchIndexer implements SearchIndexer { +public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(GraphBackedSearchIndexer.class); @@ -67,13 +71,16 @@ public class GraphBackedSearchIndexer implements SearchIndexer { @Inject public GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider) throws RepositoryException, - IndexException { + AtlasException { + this(graphProvider, ApplicationProperties.get()); + } + GraphBackedSearchIndexer(GraphProvider<TitanGraph> graphProvider, Configuration configuration) + throws IndexException, RepositoryException { this.titanGraph = graphProvider.get(); - - /* Create the transaction for indexing. - */ - initialize(); + if (!HAConfiguration.isHAEnabled(configuration)) { + initialize(); + } } /** @@ -355,6 +362,28 @@ public class GraphBackedSearchIndexer implements SearchIndexer { } } + /** + * Initialize global indices for Titan graph on server activation. + * + * Since the indices are shared state, we need to do this only from an active instance. + */ + @Override + public void instanceIsActive() throws AtlasException { + LOG.info("Reacting to active: initializing index"); + try { + initialize(); + } catch (RepositoryException e) { + throw new AtlasException("Error in reacting to active on initialization", e); + } catch (IndexException e) { + throw new AtlasException("Error in reacting to active on initialization", e); + } + } + + @Override + public void instanceIsPassive() { + LOG.info("Reacting to passive state: No action right now."); + } + /* Commenting this out since we do not need an index for edge label here private void createEdgeMixedIndex(String propertyName) { EdgeLabel edgeLabel = management.getEdgeLabel(propertyName); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index 40728bc..cd1161a 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -22,9 +22,13 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Provider; + +import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.classification.InterfaceAudience; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.repository.MetadataRepository; @@ -58,6 +62,7 @@ import org.apache.atlas.typesystem.types.TypeUtils.Pair; import org.apache.atlas.typesystem.types.ValueConversionException; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.utils.ParamChecker; +import org.apache.commons.configuration.Configuration; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -71,13 +76,14 @@ import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; /** * Simple wrapper over TypeSystem and MetadataRepository services with hooks * for listening to changes to the repository. */ @Singleton -public class DefaultMetadataService implements MetadataService { +public class DefaultMetadataService implements MetadataService, ActiveStateChangeHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultMetadataService.class); @@ -89,6 +95,8 @@ public class DefaultMetadataService implements MetadataService { private final Collection<TypesChangeListener> typeChangeListeners = new LinkedHashSet<>(); private final Collection<EntityChangeListener> entityChangeListeners = new LinkedHashSet<>(); + private boolean wasInitialized = false; + @Inject DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, final IBootstrapTypesRegistrar typesRegistrar, @@ -96,14 +104,15 @@ public class DefaultMetadataService implements MetadataService { final Collection<Provider<EntityChangeListener>> entityListenerProviders) throws AtlasException { this(repository, typeStore, typesRegistrar, typeListenerProviders, entityListenerProviders, - TypeSystem.getInstance()); + TypeSystem.getInstance(), ApplicationProperties.get()); } DefaultMetadataService(final MetadataRepository repository, final ITypeStore typeStore, final IBootstrapTypesRegistrar typesRegistrar, final Collection<Provider<TypesChangeListener>> typeListenerProviders, final Collection<Provider<EntityChangeListener>> entityListenerProviders, - final TypeSystem typeSystem) throws AtlasException { + final TypeSystem typeSystem, + final Configuration configuration) throws AtlasException { this.typeStore = typeStore; this.typesRegistrar = typesRegistrar; this.typeSystem = typeSystem; @@ -117,25 +126,37 @@ public class DefaultMetadataService implements MetadataService { entityChangeListeners.add(provider.get()); } - restoreTypeSystem(); - - typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this); + if (!HAConfiguration.isHAEnabled(configuration)) { + restoreTypeSystem(); + } } - private void restoreTypeSystem() { + private void restoreTypeSystem() throws AtlasException { LOG.info("Restoring type system from the store"); - try { - TypesDef typesDef = typeStore.restore(); + TypesDef typesDef = typeStore.restore(); + if (!wasInitialized) { + LOG.info("Initializing type system for the first time."); typeSystem.defineTypes(typesDef); // restore types before creating super types createSuperTypes(); - } catch (AtlasException e) { - throw new RuntimeException(e); + typesRegistrar.registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, this); + wasInitialized = true; + } else { + LOG.info("Type system was already initialized, refreshing cache."); + refreshCache(typesDef); } LOG.info("Restored type system from the store"); } + private void refreshCache(TypesDef typesDef) throws AtlasException { + TypeSystem.TransientTypeSystem transientTypeSystem + = typeSystem.createTransientTypeSystem(typesDef, true); + Map<String, IDataType> typesAdded = transientTypeSystem.getTypesAdded(); + LOG.info("Number of types got from transient type system: " + typesAdded.size()); + typeSystem.commitTypes(typesAdded); + } + private static final AttributeDefinition NAME_ATTRIBUTE = TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE); private static final AttributeDefinition DESCRIPTION_ATTRIBUTE = @@ -683,4 +704,23 @@ public class DefaultMetadataService implements MetadataService { listener.onEntitiesDeleted(entities); } } + + /** + * Create or restore the {@link TypeSystem} cache on server activation. + * + * When an instance is passive, types could be created outside of its cache by the active instance. + * Hence, when this instance becomes active, it needs to restore the cache from the backend store. + * The first time initialization happens, the indices for these types also needs to be created. + * This must happen only from the active instance, as it updates shared backend state. + */ + @Override + public void instanceIsActive() throws AtlasException { + LOG.info("Reacting to active state: restoring type system"); + restoreTypeSystem(); + } + + @Override + public void instanceIsPassive() { + LOG.info("Reacting to passive state: no action right now"); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java new file mode 100644 index 0000000..2f7edb4 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepositoryHATest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.audit; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class HBaseBasedAuditRepositoryHATest { + + @Mock + private Configuration configuration; + + @Mock + private org.apache.hadoop.conf.Configuration hbaseConf; + + @Mock + private Connection connection; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testTableShouldNotBeCreatedOnStartIfHAIsEnabled() throws IOException, AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME, + HBaseBasedAuditRepository.DEFAULT_TABLE_NAME)). + thenReturn(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME); + HBaseBasedAuditRepository auditRepository = new HBaseBasedAuditRepository() { + @Override + protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) { + return connection; + } + }; + auditRepository.startInternal(configuration, hbaseConf); + + verifyZeroInteractions(connection); + } + + @Test + public void testShouldCreateTableWhenReactingToActive() throws AtlasException, IOException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(configuration.getString(HBaseBasedAuditRepository.CONFIG_TABLE_NAME, + HBaseBasedAuditRepository.DEFAULT_TABLE_NAME)). + thenReturn(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME); + TableName tableName = TableName.valueOf(HBaseBasedAuditRepository.DEFAULT_TABLE_NAME); + Admin admin = mock(Admin.class); + when(connection.getAdmin()).thenReturn(admin); + when(admin.tableExists(tableName)).thenReturn(true); + HBaseBasedAuditRepository auditRepository = new HBaseBasedAuditRepository() { + @Override + protected Connection createConnection(org.apache.hadoop.conf.Configuration hbaseConf) { + return connection; + } + }; + auditRepository.startInternal(configuration, hbaseConf); + auditRepository.instanceIsActive(); + + verify(connection).getAdmin(); + verify(admin).tableExists(tableName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java new file mode 100644 index 0000000..87fdf87 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexerTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graph; + +import com.thinkaurelius.titan.core.TitanGraph; +import com.thinkaurelius.titan.core.schema.TitanManagement; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.IndexException; +import org.apache.atlas.repository.RepositoryException; +import org.apache.commons.configuration.Configuration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class GraphBackedSearchIndexerTest { + + @Mock + private Configuration configuration; + + @Mock + private GraphProvider<TitanGraph> graphProvider; + + @Mock + private TitanGraph titanGraph; + + @Mock + private TitanManagement titanManagement; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testSearchIndicesAreInitializedOnConstructionWhenHAIsDisabled() throws IndexException, RepositoryException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); + when(graphProvider.get()).thenReturn(titanGraph); + when(titanGraph.getManagementSystem()).thenReturn(titanManagement); + when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); + + GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration); + + verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY); + } + + @Test + public void testSearchIndicesAreNotInitializedOnConstructionWhenHAIsEnabled() throws IndexException, RepositoryException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(graphProvider.get()).thenReturn(titanGraph); + when(titanGraph.getManagementSystem()).thenReturn(titanManagement); + when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); + + GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration); + + verifyZeroInteractions(titanManagement); + + } + + @Test + public void testIndicesAreReinitializedWhenServerBecomesActive() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + when(graphProvider.get()).thenReturn(titanGraph); + when(titanGraph.getManagementSystem()).thenReturn(titanManagement); + when(titanManagement.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)).thenReturn(true); + + GraphBackedSearchIndexer graphBackedSearchIndexer = new GraphBackedSearchIndexer(graphProvider, configuration); + graphBackedSearchIndexer.instanceIsActive(); + + verify(titanManagement).containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java index 0685e19..effee2a 100644 --- a/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java +++ b/repository/src/test/java/org/apache/atlas/services/DefaultMetadataServiceMockTest.java @@ -25,28 +25,126 @@ import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.typestore.ITypeStore; import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.TypesChangeListener; +import org.apache.atlas.repository.MetadataRepository; +import org.apache.atlas.repository.typestore.ITypeStore; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.commons.configuration.Configuration; +import org.mockito.Matchers; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.HashMap; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class DefaultMetadataServiceMockTest { + @Mock + private IBootstrapTypesRegistrar typesRegistrar; + + @Mock + private TypeSystem typeSystem; + + @Mock + private MetadataRepository metadataRepository; + + @Mock + private ITypeStore typeStore; + + @Mock + private Configuration configuration; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + @Test public void testShouldInvokeTypesRegistrarOnCreation() throws AtlasException { - IBootstrapTypesRegistrar typesRegistrar = mock(IBootstrapTypesRegistrar.class); - TypeSystem typeSystem = mock(TypeSystem.class); when(typeSystem.isRegistered(any(String.class))).thenReturn(true); + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); DefaultMetadataService defaultMetadataService = new DefaultMetadataService(mock(MetadataRepository.class), mock(ITypeStore.class), typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), - new ArrayList<Provider<EntityChangeListener>>(), typeSystem); + new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration); + + verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(), + typeSystem, defaultMetadataService); + } + + @Test + public void testShouldNotRestoreTypesIfHAIsEnabled() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + + DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository, + typeStore, + typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), + new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration); + + verifyZeroInteractions(typeStore); + verifyZeroInteractions(typeSystem); + verifyZeroInteractions(typesRegistrar); + } + + @Test + public void testShouldRestoreTypeSystemOnServerActive() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + + TypesDef typesDef = mock(TypesDef.class); + when(typeStore.restore()).thenReturn(typesDef); + when(typeSystem.isRegistered(any(String.class))).thenReturn(true); + + DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository, + typeStore, + typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), + new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration); + defaultMetadataService.instanceIsActive(); + verify(typeStore).restore(); + verify(typeSystem).defineTypes(typesDef); verify(typesRegistrar).registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, defaultMetadataService); } + + @Test + public void testShouldOnlyRestoreCacheOnServerActiveIfAlreadyDoneOnce() throws AtlasException { + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); + + TypesDef typesDef = mock(TypesDef.class); + when(typeStore.restore()).thenReturn(typesDef); + when(typeSystem.isRegistered(any(String.class))).thenReturn(true); + + TypeSystem.TransientTypeSystem transientTypeSystem = mock(TypeSystem.TransientTypeSystem.class); + HashMap<String, IDataType> typesAdded = new HashMap<>(); + when(transientTypeSystem.getTypesAdded()).thenReturn(typesAdded); + when(typeSystem.createTransientTypeSystem(typesDef, true)). + thenReturn(transientTypeSystem); + DefaultMetadataService defaultMetadataService = new DefaultMetadataService(metadataRepository, + typeStore, + typesRegistrar, new ArrayList<Provider<TypesChangeListener>>(), + new ArrayList<Provider<EntityChangeListener>>(), typeSystem, configuration); + + defaultMetadataService.instanceIsActive(); + defaultMetadataService.instanceIsPassive(); + defaultMetadataService.instanceIsActive(); + + verify(typeStore, times(2)).restore(); + verify(typeSystem, times(1)).defineTypes(typesDef); + verify(typesRegistrar, times(1)). + registerTypes(ReservedTypesRegistrar.getTypesDir(), typeSystem, defaultMetadataService); + verify(typeSystem, times(1)).createTransientTypeSystem(typesDef, true); + verify(typeSystem, times(1)).commitTypes(typesAdded); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/server-api/pom.xml ---------------------------------------------------------------------- diff --git a/server-api/pom.xml b/server-api/pom.xml index 93a0358..d3e84c4 100644 --- a/server-api/pom.xml +++ b/server-api/pom.xml @@ -47,6 +47,19 @@ <groupId>org.apache.atlas</groupId> <artifactId>atlas-typesystem</artifactId> </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client</artifactId> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java new file mode 100644 index 0000000..06977c5 --- /dev/null +++ b/server-api/src/main/java/org/apache/atlas/ha/HAConfiguration.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.ha; + +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.AtlasException; +import org.apache.atlas.security.SecurityProperties; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.net.NetUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +/** + * A wrapper for getting configuration entries related to HighAvailability. + */ +public class HAConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(HAConfiguration.class); + + public static final String ATLAS_SERVER_HA_PREFIX = "atlas.server.ha"; + public static final String ATLAS_SERVER_HA_ENABLED_KEY = ATLAS_SERVER_HA_PREFIX + ".enabled"; + public static final String ATLAS_SERVER_ADDRESS_PREFIX = "atlas.server.address."; + public static final String ATLAS_SERVER_IDS = "atlas.server.ids"; + public static final String HA_ZOOKEEPER_CONNECT = ATLAS_SERVER_HA_PREFIX + ".zookeeper.connect"; + public static final int DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS = 1000; + public static final String HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.retry.sleeptime.ms"; + public static final String HA_ZOOKEEPER_NUM_RETRIES = ATLAS_SERVER_HA_PREFIX + ".zookeeper.num.retries"; + public static final int DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES = 3; + public static final String HA_ZOOKEEPER_SESSION_TIMEOUT_MS = ATLAS_SERVER_HA_PREFIX + ".zookeeper.session.timeout.ms"; + public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS = 20000; + + /** + * Return whether HA is enabled or not. + * @param configuration underlying configuration instance + * @return + */ + public static boolean isHAEnabled(Configuration configuration) { + return configuration.getBoolean(ATLAS_SERVER_HA_ENABLED_KEY, false); + } + + /** + * Return the ID corresponding to this Atlas instance. + * + * The match is done by looking for an ID configured in {@link HAConfiguration#ATLAS_SERVER_IDS} key + * that has a host:port entry for the key {@link HAConfiguration#ATLAS_SERVER_ADDRESS_PREFIX}+ID where + * the host is a local IP address and port is set in the system property + * {@link AtlasConstants#SYSTEM_PROPERTY_APP_PORT}. + * + * @param configuration + * @return + * @throws AtlasException if no ID is found that maps to a local IP Address or port + */ + public static String getAtlasServerId(Configuration configuration) throws AtlasException { + // ids are already trimmed by this method + String[] ids = configuration.getStringArray(ATLAS_SERVER_IDS); + String matchingServerId = null; + int appPort = Integer.parseInt(System.getProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT)); + for (String id : ids) { + String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +id); + if (!StringUtils.isEmpty(hostPort)) { + InetSocketAddress socketAddress; + try { + socketAddress = NetUtils.createSocketAddr(hostPort); + } catch (Exception e) { + LOG.warn("Exception while trying to get socket address for " + hostPort, e); + continue; + } + if (!socketAddress.isUnresolved() + && NetUtils.isLocalAddress(socketAddress.getAddress()) + && appPort == socketAddress.getPort()) { + LOG.info("Found matched server id " + id + " with host port: " + hostPort); + matchingServerId = id; + break; + } + } else { + LOG.info("Could not find matching address entry for id: " + id); + } + } + if (matchingServerId == null) { + String msg = String.format("Could not find server id for this instance. " + + "Unable to find IDs matching any local host and port binding among %s", + StringUtils.join(ids, ",")); + throw new AtlasException(msg); + } + return matchingServerId; + } + + /** + * Get the web server address that a server instance with the passed ID is bound to. + * + * This method uses the property {@link SecurityProperties#TLS_ENABLED} to determine whether + * the URL is http or https. + * + * @param configuration underlying configuration + * @param serverId serverId whose host:port property is picked to build the web server address. + * @return + */ + public static String getBoundAddressForId(Configuration configuration, String serverId) { + String hostPort = configuration.getString(ATLAS_SERVER_ADDRESS_PREFIX +serverId); + boolean isSecure = configuration.getBoolean(SecurityProperties.TLS_ENABLED); + String protocol = (isSecure) ? "https://" : "http://"; + return protocol + hostPort; + } + + /** + * A collection of Zookeeper specific configuration that is used by High Availability code + */ + public static class ZookeeperProperties { + private String connectString; + private int retriesSleepTimeMillis; + private int numRetries; + private int sessionTimeout; + + public ZookeeperProperties(String connectString, int retriesSleepTimeMillis, int numRetries, + int sessionTimeout) { + this.connectString = connectString; + this.retriesSleepTimeMillis = retriesSleepTimeMillis; + this.numRetries = numRetries; + this.sessionTimeout = sessionTimeout; + } + + public String getConnectString() { + return connectString; + } + + public int getRetriesSleepTimeMillis() { + return retriesSleepTimeMillis; + } + + public int getNumRetries() { + return numRetries; + } + + public int getSessionTimeout() { + return sessionTimeout; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ZookeeperProperties that = (ZookeeperProperties) o; + + if (retriesSleepTimeMillis != that.retriesSleepTimeMillis) return false; + if (numRetries != that.numRetries) return false; + if (sessionTimeout != that.sessionTimeout) return false; + return !(connectString != null ? !connectString.equals(that.connectString) : that.connectString != null); + + } + + @Override + public int hashCode() { + int result = connectString != null ? connectString.hashCode() : 0; + result = 31 * result + retriesSleepTimeMillis; + result = 31 * result + numRetries; + result = 31 * result + sessionTimeout; + return result; + } + } + + public static ZookeeperProperties getZookeeperProperties(Configuration configuration) { + String zookeeperConnectString = configuration.getString("atlas.kafka.zookeeper.connect"); + if (configuration.containsKey(HA_ZOOKEEPER_CONNECT)) { + zookeeperConnectString = configuration.getString(HA_ZOOKEEPER_CONNECT); + } + + int retriesSleepTimeMillis = configuration.getInt(HA_ZOOKEEPER_RETRY_SLEEPTIME_MILLIS, + DEFAULT_ZOOKEEPER_CONNECT_SLEEPTIME_MILLIS); + + int numRetries = configuration.getInt(HA_ZOOKEEPER_NUM_RETRIES, DEFAULT_ZOOKEEPER_CONNECT_NUM_RETRIES); + + int sessionTimeout = configuration.getInt(HA_ZOOKEEPER_SESSION_TIMEOUT_MS, + DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MILLIS); + return new ZookeeperProperties(zookeeperConnectString, retriesSleepTimeMillis, numRetries, sessionTimeout); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java new file mode 100644 index 0000000..87a69ef --- /dev/null +++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.listener; + +import org.apache.atlas.AtlasException; + +/** + * An interface that should be implemented by objects and services to react to changes in state of an Atlas server. + * + * The two state transitions we handle are (1) becoming active and (2) becoming passive. + */ +public interface ActiveStateChangeHandler { + + /** + * Callback that is invoked on an implementor when this instance of Atlas server is declared the leader. + * + * Any initialization that must be carried out by an implementor only when the server becomes active + * should happen on this callback. + * + * @throws {@link AtlasException} if anything is wrong on initialization + */ + void instanceIsActive() throws AtlasException; + + /** + * Callback that is invoked on an implementor when this instance of Atlas server is removed as the leader. + * + * Any cleanup that must be carried out by an implementor when the server becomes passive + * should happen on this callback. + * + * @throws {@link AtlasException} if anything is wrong on shutdown + */ + void instanceIsPassive() throws AtlasException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java b/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java new file mode 100644 index 0000000..a7c9f37 --- /dev/null +++ b/server-api/src/main/test/org/apache/atlas/ha/HAConfigurationTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.ha; + +import org.apache.atlas.AtlasConstants; +import org.apache.atlas.AtlasException; +import org.apache.atlas.security.SecurityProperties; +import org.apache.commons.configuration.Configuration; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +public class HAConfigurationTest { + + @Mock + private Configuration configuration; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + System.setProperty(AtlasConstants.SYSTEM_PROPERTY_APP_PORT, AtlasConstants.DEFAULT_APP_PORT_STR); + } + + @Test + public void testShouldSelectRightServerAddress() throws AtlasException { + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000"); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id2")).thenReturn("127.0.0.1:21000"); + + String atlasServerId = HAConfiguration.getAtlasServerId(configuration); + assertEquals(atlasServerId, "id2"); + } + + @Test(expectedExceptions = AtlasException.class) + public void testShouldFailIfNoIDsConfiguration() throws AtlasException { + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {}); + HAConfiguration.getAtlasServerId(configuration); + fail("Should not return any server id if IDs not found in configuration"); + } + + @Test(expectedExceptions = AtlasException.class) + public void testShouldFailIfNoMatchingAddressForID() throws AtlasException { + when(configuration.getStringArray(HAConfiguration.ATLAS_SERVER_IDS)).thenReturn(new String[] {"id1", "id2"}); + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:31000"); + + HAConfiguration.getAtlasServerId(configuration); + fail("Should not return any server id if no matching address found for any ID"); + } + + @Test + public void testShouldReturnHTTPBoundAddress() { + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21000"); + when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(false); + + String address = HAConfiguration.getBoundAddressForId(configuration, "id1"); + + assertEquals(address, "http://127.0.0.1:21000"); + } + + @Test + public void testShouldReturnHTTPSBoundAddress() { + when(configuration.getString(HAConfiguration.ATLAS_SERVER_ADDRESS_PREFIX +"id1")).thenReturn("127.0.0.1:21443"); + when(configuration.getBoolean(SecurityProperties.TLS_ENABLED)).thenReturn(true); + + String address = HAConfiguration.getBoundAddressForId(configuration, "id1"); + + assertEquals(address, "https://127.0.0.1:21443"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java index b41f3db..402800e 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/TypeSystem.java @@ -27,11 +27,14 @@ import org.apache.atlas.classification.InterfaceAudience; import org.apache.atlas.typesystem.TypesDef; import org.apache.atlas.typesystem.exception.TypeExistsException; import org.apache.atlas.typesystem.exception.TypeNotFoundException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.inject.Singleton; import java.lang.reflect.Constructor; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -43,6 +46,8 @@ import java.util.concurrent.ConcurrentHashMap; @Singleton @InterfaceAudience.Private public class TypeSystem { + private static final Logger LOG = LoggerFactory.getLogger(TypeSystem.class); + private static final TypeSystem INSTANCE = new TypeSystem(); private static ThreadLocal<SimpleDateFormat> dateFormat = new ThreadLocal<SimpleDateFormat>() { @Override @@ -333,7 +338,10 @@ public class TypeSystem { IDataType type = typeEntry.getValue(); //Add/replace the new type in the typesystem types.put(typeName, type); - typeCategoriesToTypeNamesMap.put(type.getTypeCategory(), typeName); + // ArrayListMultiMap allows duplicates - we want to avoid this during re-activation. + if (!typeCategoriesToTypeNamesMap.containsEntry(type.getTypeCategory(), typeName)) { + typeCategoriesToTypeNamesMap.put(type.getTypeCategory(), typeName); + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/typesystem/src/main/resources/atlas-application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties index 9a32e04..f753785 100644 --- a/typesystem/src/main/resources/atlas-application.properties +++ b/typesystem/src/main/resources/atlas-application.properties @@ -87,4 +87,8 @@ atlas.server.https.port=31443 hbase.security.authentication=simple -atlas.hook.falcon.synchronous=true \ No newline at end of file +atlas.hook.falcon.synchronous=true +######### High Availability Configuration ######## +atlas.server.ha.enabled=false +atlas.server.ids=id1 +atlas.server.address.id1=localhost:21000 http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8bde666b/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java ---------------------------------------------------------------------- diff --git a/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java b/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java index f9f5f21..a3be4c5 100755 --- a/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java +++ b/typesystem/src/test/java/org/apache/atlas/typesystem/types/TypeSystemTest.java @@ -32,12 +32,14 @@ import org.testng.annotations.Test; import scala.actors.threadpool.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createStructTypeDef; import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef; +import static org.testng.Assert.assertTrue; public class TypeSystemTest extends BaseTest { @@ -55,7 +57,7 @@ public class TypeSystemTest extends BaseTest { public void testGetTypeNames() throws Exception { getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2), new EnumValue("3", 3)); - Assert.assertTrue(getTypeSystem().getTypeNames().contains("enum_test")); + assertTrue(getTypeSystem().getTypeNames().contains("enum_test")); } @Test @@ -65,7 +67,7 @@ public class TypeSystemTest extends BaseTest { String typeDescription = typeName + description; getTypeSystem().defineEnumType(typeName, typeDescription, new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2), new EnumValue("3", 3)); - Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); + assertTrue(getTypeSystem().getTypeNames().contains(typeName)); IDataType type = getTypeSystem().getDataType(EnumType.class, typeName); Assert.assertNotNull(type); Assert.assertEquals(type.getDescription(), typeDescription); @@ -76,7 +78,7 @@ public class TypeSystemTest extends BaseTest { .createTraitTypeDef(typeName, typeDescription, ImmutableSet.<String>of(), TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE)); getTypeSystem().defineTraitType(trait); - Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); + assertTrue(getTypeSystem().getTypeNames().contains(typeName)); type = getTypeSystem().getDataType(TraitType.class, typeName); Assert.assertNotNull(type); Assert.assertEquals(type.getDescription(), typeDescription); @@ -87,7 +89,7 @@ public class TypeSystemTest extends BaseTest { .createClassTypeDef(typeName, typeDescription, ImmutableSet.<String>of(), TypesUtil.createRequiredAttrDef("type", DataTypes.STRING_TYPE)); getTypeSystem().defineClassType(classType); - Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); + assertTrue(getTypeSystem().getTypeNames().contains(typeName)); type = getTypeSystem().getDataType(ClassType.class, typeName); Assert.assertNotNull(type); Assert.assertEquals(type.getDescription(), typeDescription); @@ -95,7 +97,7 @@ public class TypeSystemTest extends BaseTest { typeName = "struct_type"; typeDescription = typeName + description; getTypeSystem().defineStructType(typeName, typeDescription, true, createRequiredAttrDef("a", DataTypes.INT_TYPE)); - Assert.assertTrue(getTypeSystem().getTypeNames().contains(typeName)); + assertTrue(getTypeSystem().getTypeNames().contains(typeName)); type = getTypeSystem().getDataType(StructType.class, typeName); Assert.assertNotNull(type); Assert.assertEquals(type.getDescription(), typeDescription); @@ -106,7 +108,7 @@ public class TypeSystemTest extends BaseTest { public void testIsRegistered() throws Exception { getTypeSystem().defineEnumType("enum_test", new EnumValue("0", 0), new EnumValue("1", 1), new EnumValue("2", 2), new EnumValue("3", 3)); - Assert.assertTrue(getTypeSystem().isRegistered("enum_test")); + assertTrue(getTypeSystem().isRegistered("enum_test")); } @Test @@ -182,9 +184,9 @@ public class TypeSystemTest extends BaseTest { ClassType bc = ts.getDataType(ClassType.class, "B"); ClassType cc = ts.getDataType(ClassType.class, "C"); - Assert.assertTrue(ac.compareTo(bc) < 0); - Assert.assertTrue(bc.compareTo(cc) < 0); - Assert.assertTrue(ac.compareTo(cc) < 0); + assertTrue(ac.compareTo(bc) < 0); + assertTrue(bc.compareTo(cc) < 0); + assertTrue(ac.compareTo(cc) < 0); } @Test @@ -223,4 +225,31 @@ public class TypeSystemTest extends BaseTest { Assert.assertEquals(traitNames.size(), 4); Assert.assertEquals(classNames.size(), 3); } + + @Test + public void testTypeNamesAreNotDuplicated() { + TypeSystem typeSystem = getTypeSystem(); + ImmutableList<String> traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT); + int numTraits = traitNames.size(); + + HashMap<String, IDataType> typesAdded = new HashMap<>(); + String traitName = "dup_type_test" + random(); + TraitType traitType = new TraitType(typeSystem, traitName, null, null, 0); + typesAdded.put(traitName, traitType); + typeSystem.commitTypes(typesAdded); + + traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT); + Assert.assertEquals(traitNames.size(), numTraits+1); + + // add again with another trait this time + traitName = "dup_type_test" + random(); + TraitType traitTypeNew = new TraitType(typeSystem, traitName, null, null, 0); + typesAdded.put(traitName, traitTypeNew); + + typeSystem.commitTypes(typesAdded); + traitNames = typeSystem.getTypeNamesByCategory(DataTypes.TypeCategory.TRAIT); + Assert.assertEquals(traitNames.size(), numTraits+2); + } + + }
