This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 5ed088b4a6192d2f15656870e6522fdaa37d46f3 Author: Patrik Marton <[email protected]> AuthorDate: Fri Jul 29 11:50:44 2022 +0200 ATLAS-4619: Refactor Atlas webapp module to remove Kafka core dependency Change-Id: Ie3422851cb711da4e7c4d0845319db6c33333f65 Signed-off-by: Pinal Shah <[email protected]> (cherry picked from commit 3c7d8c072055c246200a5617488ace5a1d64ae57) --- .../notification/NotificationHookConsumer.java | 25 ++++++++-------------- .../NotificationHookConsumerKafkaTest.java | 22 ++++++++----------- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 49c504f9f..1cdfcef8a 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -19,7 +19,6 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import kafka.utils.ShutdownableThread; import org.apache.atlas.*; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; @@ -122,6 +121,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; private static final String EXCEPTION_CLASS_NAME_JANUSGRAPH_EXCEPTION = "JanusGraphException"; private static final String EXCEPTION_CLASS_NAME_PERMANENTLOCKING_EXCEPTION = "PermanentLockingException"; + private static final int KAFKA_CONSUMER_SHUTDOWN_WAIT = 30000; // from org.apache.hadoop.hive.ql.parse.SemanticAnalyzer public static final String DUMMY_DATABASE = "_dummy_database"; @@ -379,7 +379,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (executors != null) { executors.shutdown(); - if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + if (!executors.awaitTermination(KAFKA_CONSUMER_SHUTDOWN_WAIT, TimeUnit.MILLISECONDS)) { LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } @@ -523,21 +523,21 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } @VisibleForTesting - class HookConsumer extends ShutdownableThread { + class HookConsumer extends Thread { private final NotificationConsumer<HookNotification> consumer; private final AtomicBoolean shouldRun = new AtomicBoolean(false); private final List<String> failedMessages = new ArrayList<>(); private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); public HookConsumer(NotificationConsumer<HookNotification> consumer) { - super("atlas-hook-consumer-thread", false); + super("atlas-hook-consumer-thread"); this.consumer = consumer; } @Override - public void doWork() { - LOG.info("==> HookConsumer doWork()"); + public void run() { + LOG.info("==> HookConsumer run()"); shouldRun.set(true); @@ -572,12 +572,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl consumer.close(); } - LOG.info("<== HookConsumer doWork()"); + LOG.info("<== HookConsumer run()"); } } @VisibleForTesting - void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException { + void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) { AtlasPerfTracer perf = null; HookNotification message = kafkaMsg.getMessage(); String messageUser = message.getUser(); @@ -957,26 +957,19 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return true; } - @Override public void shutdown() { LOG.info("==> HookConsumer shutdown()"); // handle the case where thread was not started at all // and shutdown called - if (shouldRun.get() == false) { + if (!shouldRun.compareAndSet(true, false)) { return; } - super.initiateShutdown(); - - shouldRun.set(false); - if (consumer != null) { consumer.wakeup(); } - super.awaitShutdown(); - LOG.info("<== HookConsumer shutdown()"); } } diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index fdfc2560d..716f592f5 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -186,23 +186,19 @@ public class NotificationHookConsumerKafkaTest { } void consumeOneMessage(NotificationConsumer<HookNotification> consumer, - NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException { - try { - long startTime = System.currentTimeMillis(); //fetch starting time + NotificationHookConsumer.HookConsumer hookConsumer) { + long startTime = System.currentTimeMillis(); //fetch starting time - while ((System.currentTimeMillis() - startTime) < 10000) { - List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive(); + while ((System.currentTimeMillis() - startTime) < 10000) { + List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive(); - for (AtlasKafkaMessage<HookNotification> msg : messages) { - hookConsumer.handleMessage(msg); - } + for (AtlasKafkaMessage<HookNotification> msg : messages) { + hookConsumer.handleMessage(msg); + } - if (messages.size() > 0) { - break; - } + if (messages.size() > 0) { + break; } - } catch (AtlasServiceException | AtlasException e) { - Assert.fail("Consumer failed with exception ", e); } }
