ATLAS-1944: Implemented ShutdownableThread for HookConsumer Signed-off-by: Madhan Neethiraj <mad...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/18745cf4 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/18745cf4 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/18745cf4 Branch: refs/heads/feature-odf Commit: 18745cf4b98af9c45e853daa280342dde8da1300 Parents: b0470f5 Author: ashutoshm <ames...@hortonworks.com> Authored: Wed Jul 12 14:43:45 2017 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Thu Jul 13 13:25:15 2017 -0700 ---------------------------------------------------------------------- .../notification/NotificationHookConsumer.java | 56 +++++++++++--------- 1 file changed, 31 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/18745cf4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 9e5b864..0dea0e2 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -19,6 +19,7 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import kafka.utils.ShutdownableThread; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; @@ -28,7 +29,11 @@ import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest; +import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest; import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; @@ -41,11 +46,12 @@ import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.service.ServiceState; import org.apache.atlas.web.util.DateTimeHelper; import org.apache.commons.configuration.Configuration; +import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; -import org.apache.kafka.common.TopicPartition; + import javax.inject.Inject; import java.util.ArrayList; import java.util.Date; @@ -56,14 +62,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.atlas.AtlasClientV2.CREATE_ENTITY; -import static org.apache.atlas.AtlasClientV2.DELETE_ENTITY_BY_ATTRIBUTE; -import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY; -import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY_BY_ATTRIBUTE; -import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest; -import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest; -import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest; -import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import static org.apache.atlas.AtlasClientV2.*; /** * Consumer of notifications from hooks e.g., hive hook etc. @@ -80,7 +79,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize"; - public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval"; + public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; private final AtlasEntityStore atlasEntityStore; @@ -177,7 +176,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl /** * Start Kafka consumer threads that read from Kafka topic when server is activated. - * + * <p> * Since the consumers create / update entities to the shared backend store, only the active instance * should perform this activity. Hence, these threads are started only on server activation. */ @@ -189,7 +188,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl /** * Stop Kafka consumer threads that read from Kafka topic when server is de-activated. - * + * <p> * Since the consumers create / update entities to the shared backend store, only the active instance * should perform this activity. Hence, these threads are stopped only on server deactivation. */ @@ -205,18 +204,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } - class HookConsumer implements Runnable { + class HookConsumer extends ShutdownableThread { private final NotificationConsumer<HookNotificationMessage> consumer; private final AtomicBoolean shouldRun = new AtomicBoolean(false); private List<HookNotificationMessage> failedMessages = new ArrayList<>(); public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) { + super("atlas-hook-consumer-thread", false); this.consumer = consumer; } - @Override - public void run() { + public void doWork() { shouldRun.set(true); if (!serverAvailable(new NotificationHookConsumer.Timer())) { @@ -226,7 +225,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl while (shouldRun.get()) { try { List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L); - for (AtlasKafkaMessage<HookNotificationMessage> msg : messages){ + for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { handleMessage(msg); } } catch (Throwable t) { @@ -267,15 +266,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (numRetries == 0) { // audit only on the first attempt audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(), - String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName())); + String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), partialUpdateRequest.getTypeName())); } Referenceable referenceable = partialUpdateRequest.getEntity(); entities = instanceConverter.toAtlasEntity(referenceable); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName()); - String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>(){ - { put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); } + String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() { + { + put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); + } }); // There should only be one root entity @@ -289,13 +290,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (numRetries == 0) { // audit only on the first attempt audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(), - String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName())); + String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), deleteRequest.getTypeName())); } try { AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); atlasEntityStore.deleteByUniqueAttributes(type, - new HashMap<String, Object>() {{ put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); }}); + new HashMap<String, Object>() {{ + put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); + }}); } catch (ClassCastException cle) { LOG.error("Failed to do a partial update on Entity"); } @@ -319,10 +322,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl break; } catch (Throwable e) { LOG.warn("Error handling message", e); - try{ + try { LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); Thread.sleep(consumerRetryInterval); - }catch (InterruptedException ie){ + } catch (InterruptedException ie) { LOG.error("Notification consumer thread sleep interrupted"); } @@ -379,9 +382,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return true; } - public void stop() { + @Override + public void shutdown() { + super.initiateShutdown(); shouldRun.set(false); consumer.close(); + super.awaitShutdown(); } } @@ -393,4 +399,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date())); } -} +} \ No newline at end of file