Repository: incubator-atlas Updated Branches: refs/heads/branch-0.6-incubating 85fc40c05 -> 5f0a5ca22
ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown (yhmenath via sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/5f0a5ca2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/5f0a5ca2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/5f0a5ca2 Branch: refs/heads/branch-0.6-incubating Commit: 5f0a5ca221d13def0c06877d9453615efdecbf83 Parents: 85fc40c Author: Suma Shivaprasad <[email protected]> Authored: Mon Dec 14 16:39:25 2015 +0530 Committer: Suma Shivaprasad <[email protected]> Committed: Mon Dec 14 16:39:25 2015 +0530 ---------------------------------------------------------------------- client/pom.xml | 6 ++ .../main/java/org/apache/atlas/AtlasClient.java | 22 +++++- .../java/org/apache/atlas/AtlasClientTest.java | 67 ++++++++++++++++ .../notification/NotificationHookConsumer.java | 42 ++++++++++ .../NotificationHookConsumerTest.java | 82 ++++++++++++++++++++ pom.xml | 2 +- release-log.txt | 1 + .../atlas/web/listeners/GuiceServletConfig.java | 2 +- 8 files changed, 221 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5f0a5ca2/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index 01281d0..8691843 100755 --- a/client/pom.xml +++ b/client/pom.xml @@ -67,5 +67,11 @@ <groupId>org.testng</groupId> <artifactId>testng</artifactId> </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5f0a5ca2/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index becc4db..b108b25 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -19,6 +19,7 @@ package org.apache.atlas; import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.DefaultClientConfig; @@ -67,6 +68,7 @@ public class AtlasClient { public static final String DATATYPE = "dataType"; public static final String BASE_URI = "api/atlas/"; + public static final String ADMIN_VERSION = "admin/version"; public static final String TYPES = "types"; public static final String URI_ENTITY = "entities"; public static final String URI_SEARCH = "discovery/search"; @@ -126,11 +128,29 @@ public class AtlasClient { service = client.resource(UriBuilder.fromUri(baseUrl).build()); } + // for testing + AtlasClient(WebResource service) { + this.service = service; + } + protected Configuration getClientProperties() throws AtlasException { return ApplicationProperties.get(); } - enum API { + public boolean isServerReady() throws AtlasServiceException { + WebResource resource = getResource(API.VERSION); + try { + callAPIWithResource(API.VERSION, resource); + return true; + } catch (ClientHandlerException che) { + return false; + } + } + + public enum API { + + //Admin operations + VERSION(BASE_URI + ADMIN_VERSION, HttpMethod.GET, Response.Status.OK), //Type operations CREATE_TYPE(BASE_URI + TYPES, HttpMethod.POST, Response.Status.CREATED), http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5f0a5ca2/client/src/test/java/org/apache/atlas/AtlasClientTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/atlas/AtlasClientTest.java b/client/src/test/java/org/apache/atlas/AtlasClientTest.java new file mode 100644 index 0000000..1e7eed1 --- /dev/null +++ b/client/src/test/java/org/apache/atlas/AtlasClientTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.testng.annotations.Test; + +import javax.ws.rs.core.Response; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AtlasClientTest { + + @Test + public void shouldVerifyServerIsReady() throws AtlasServiceException { + WebResource webResource = mock(WebResource.class); + AtlasClient atlasClient = new AtlasClient(webResource); + + WebResource.Builder builder = setupBuilder(webResource); + ClientResponse response = mock(ClientResponse.class); + when(response.getStatus()).thenReturn(Response.Status.OK.getStatusCode()); + when(response.getEntity(String.class)).thenReturn("{\"Version\":\"version-rrelease\",\"Name\":\"apache-atlas\"," + + "\"Description\":\"Metadata Management and Data Governance Platform over Hadoop\"}"); + when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenReturn(response); + + assertTrue(atlasClient.isServerReady()); + } + + private WebResource.Builder setupBuilder(WebResource webResource) { + WebResource adminVersionResource = mock(WebResource.class); + when(webResource.path(AtlasClient.API.VERSION.getPath())).thenReturn(adminVersionResource); + WebResource.Builder builder = mock(WebResource.Builder.class); + when(adminVersionResource.accept(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); + when(builder.type(AtlasClient.JSON_MEDIA_TYPE)).thenReturn(builder); + return builder; + } + + @Test + public void shouldReturnFalseIfServerIsNotReady() throws AtlasServiceException { + WebResource webResource = mock(WebResource.class); + AtlasClient atlasClient = new AtlasClient(webResource); + WebResource.Builder builder = setupBuilder(webResource); + when(builder.method(AtlasClient.API.VERSION.getMethod(), ClientResponse.class, null)).thenThrow( + new ClientHandlerException()); + assertFalse(atlasClient.isServerReady()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5f0a5ca2/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index ffeb406..1bee26f 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -22,6 +22,7 @@ import com.google.inject.Singleton; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; import org.codehaus.jettison.json.JSONArray; @@ -42,6 +43,7 @@ public class NotificationHookConsumer implements Service { public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String ATLAS_ENDPOINT_PROPERTY = "atlas.rest.address"; + public static final int SERVER_READY_WAIT_TIME_MS = 1000; @Inject private NotificationInterface notificationInterface; @@ -77,15 +79,32 @@ public class NotificationHookConsumer implements Service { } } + static class Timer { + public void sleep(int interval) throws InterruptedException { + Thread.sleep(interval); + } + } + class HookConsumer implements Runnable { private final NotificationConsumer<JSONArray> consumer; + private final AtlasClient client; public HookConsumer(NotificationConsumer<JSONArray> consumer) { + this(atlasClient, consumer); + } + + public HookConsumer(AtlasClient client, NotificationConsumer<JSONArray> consumer) { + this.client = client; this.consumer = consumer; } @Override public void run() { + + if (!serverAvailable(new NotificationHookConsumer.Timer())) { + return; + } + while(consumer.hasNext()) { JSONArray entityJson = consumer.next(); LOG.info("Processing message {}", entityJson); @@ -98,5 +117,28 @@ public class NotificationHookConsumer implements Service { } } } + + boolean serverAvailable(Timer timer) { + try { + while (!client.isServerReady()) { + try { + LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", + SERVER_READY_WAIT_TIME_MS); + timer.sleep(SERVER_READY_WAIT_TIME_MS); + } catch (InterruptedException e) { + LOG.info("Interrupted while waiting for Atlas Server to become ready, " + + "exiting consumer thread.", e); + return false; + } + } + } catch (AtlasServiceException e) { + LOG.info( + "Handled AtlasServiceException while waiting for Atlas Server to become ready, " + + "exiting consumer thread.", e); + return false; + } + LOG.info("Atlas Server is ready, can start reading Kafka events."); + return true; + } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5f0a5ca2/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java new file mode 100644 index 0000000..e4d7f8c --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification; + +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasServiceException; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; + +public class NotificationHookConsumerTest { + + @Test + public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException { + AtlasClient atlasClient = mock(AtlasClient.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + when(atlasClient.isServerReady()).thenReturn(true); + + assertTrue(hookConsumer.serverAvailable(timer)); + + verifyZeroInteractions(timer); + } + + @Test + public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException { + AtlasClient atlasClient = mock(AtlasClient.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + when(atlasClient.isServerReady()).thenReturn(false, false, false, true); + + assertTrue(hookConsumer.serverAvailable(timer)); + + verify(timer, times(3)).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS); + } + + @Test + public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException { + AtlasClient atlasClient = mock(AtlasClient.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS); + when(atlasClient.isServerReady()).thenReturn(false); + + assertFalse(hookConsumer.serverAvailable(timer)); + } + + @Test + public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException { + AtlasClient atlasClient = mock(AtlasClient.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION, new Exception())); + + assertFalse(hookConsumer.serverAvailable(timer)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5f0a5ca2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 71a74a8..1a324b7 100755 --- a/pom.xml +++ b/pom.xml @@ -323,7 +323,7 @@ <node.version>v0.10.30</node.version> <slf4j.version>1.7.7</slf4j.version> <jetty.version>9.2.12.v20150709</jetty.version> - <jersey.version>1.10</jersey.version> + <jersey.version>1.19</jersey.version> <jackson.version>1.8.3</jackson.version> <tinkerpop.version>2.6.0</tinkerpop.version> <titan.version>0.5.4</titan.version> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5f0a5ca2/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 42c9727..b68d3dc 100644 --- a/release-log.txt +++ b/release-log.txt @@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags) ALL CHANGES: +ATLAS-346 Atlas server loses messages sent from Hive hook if restarted after unclean shutdown(yhemath via sumasai) ATLAS-382 Fixed Hive Bridge doc for ATLAS cluster name (sumasai) ATLAS-244 UI: Add Tag Tab (darshankumar89 via sumasai) ATLAS-376 UI: Use the Schema API of the backend to populate details for Schema tab (darshankumar89 via sumasai) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/5f0a5ca2/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java index f0d80cb..c1f6a9b 100755 --- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java +++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java @@ -124,7 +124,7 @@ public class GuiceServletConfig extends GuiceServletContextListener { } protected void startServices() { - LOG.debug("Starting services"); + LOG.info("Starting services"); Services services = injector.getInstance(Services.class); services.start(); }
