LENS-887 : Add exception handling over event process threads and increase pool size for QueryEndNotifier and ResultFormatter
Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/73f92430 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/73f92430 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/73f92430 Branch: refs/heads/LENS-581 Commit: 73f92430c70664cf5b8c63ec9b174a4a1b27d2ad Parents: 36166a2 Author: Puneet Gupta <[email protected]> Authored: Tue Dec 15 18:22:40 2015 +0530 Committer: Amareshwari Sriramadasu <[email protected]> Committed: Tue Dec 15 18:22:40 2015 +0530 ---------------------------------------------------------------------- .../server/api/events/AsyncEventListener.java | 50 +++++++++----- .../apache/lens/server/EventServiceImpl.java | 9 ++- .../lens/server/query/QueryEndNotifier.java | 72 +++++++++++--------- .../lens/server/query/ResultFormatter.java | 5 ++ .../lens/server/query/TestEventService.java | 45 ++++++++++++ 5 files changed, 131 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java index 547c008..84728e5 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/events/AsyncEventListener.java @@ -22,12 +22,18 @@ import java.util.concurrent.*; import org.apache.lens.server.api.error.LensException; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; /** * Event listeners should implement this class if they wish to process events asynchronously. This should be used when * event processing can block, or is computationally intensive. * * @param <T> the generic type */ +@Slf4j public abstract class AsyncEventListener<T extends LensEvent> implements LensEventListener<T> { /** @@ -41,49 +47,57 @@ public abstract class AsyncEventListener<T extends LensEvent> implements LensEve protected final BlockingQueue<Runnable> eventQueue; /** + * Name of this Asynchronous Event Listener. Will be used for logging and to name the threads in thread pool that + * allow asynchronous handling of events. If required, Sub Classes can override <code>getName</code> method to + * provide more appropriate name. + * + * Default value is the class Name (Example QueryEndNotifier, ResultFormatter, etc) + */ + @Getter(AccessLevel.PROTECTED) + private final String name = this.getClass().getSimpleName(); + + /** * Create a single threaded event listener with an unbounded queue, with daemon threads. */ public AsyncEventListener() { - this(1); + this(1, 1); } /** * Create a event listener with poolSize threads with an unbounded queue and daemon threads. * * @param poolSize the pool size + * @param maxPoolSize the max pool size */ - public AsyncEventListener(int poolSize) { - this(poolSize, -1, 10, true); + public AsyncEventListener(int poolSize, int maxPoolSize) { + this(poolSize, maxPoolSize, -1, 10, true); } /** * Create an asynchronous event listener which uses a thread poool to process events. * * @param poolSize size of the event processing pool + * @param maxPoolSize the max pool size * @param maxQueueSize max size of the event queue, if this is non positive, then the queue is unbounded * @param timeOutSeconds time out in seconds when an idle thread is destroyed * @param isDaemon if the threads used to process should be daemon threads, * if false, then implementation should call stop() * to stop the thread pool */ - public AsyncEventListener(int poolSize, int maxQueueSize, long timeOutSeconds, final boolean isDaemon) { + public AsyncEventListener(int poolSize, int maxPoolSize, int maxQueueSize, long timeOutSeconds, + final boolean isDaemon) { if (maxQueueSize <= 0) { eventQueue = new LinkedBlockingQueue<Runnable>(); } else { eventQueue = new ArrayBlockingQueue<Runnable>(maxQueueSize); } - processor = new ThreadPoolExecutor(poolSize, poolSize, timeOutSeconds, TimeUnit.SECONDS, eventQueue, - new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - Thread th = new Thread(runnable); - th.setName("event_processor_thread"); - th.setDaemon(isDaemon); - return th; - } - }); - processor.allowCoreThreadTimeOut(true); + ThreadFactory factory = new BasicThreadFactory.Builder() + .namingPattern(getName()+"_AsyncThread-%d") + .daemon(isDaemon) + .priority(Thread.NORM_PRIORITY) + .build(); + processor = new ThreadPoolExecutor(poolSize, maxPoolSize, timeOutSeconds, TimeUnit.SECONDS, eventQueue, factory); } /** @@ -98,7 +112,11 @@ public abstract class AsyncEventListener<T extends LensEvent> implements LensEve processor.execute(new Runnable() { @Override public void run() { - process(event); + try { + process(event); + } catch (Throwable e) { + log.error("{} Failed to process event {}", getName(), event, e); + } } }); } catch (RejectedExecutionException rejected) { http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java index a276828..369885d 100644 --- a/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/EventServiceImpl.java @@ -21,6 +21,7 @@ package org.apache.lens.server; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; @@ -29,6 +30,7 @@ import org.apache.lens.server.api.events.LensEventListener; import org.apache.lens.server.api.events.LensEventService; import org.apache.lens.server.api.health.HealthStatus; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.AbstractService; @@ -64,8 +66,13 @@ public class EventServiceImpl extends AbstractService implements LensEventServic @Override public synchronized void init(HiveConf hiveConf) { int numProcs = Runtime.getRuntime().availableProcessors(); + ThreadFactory factory = new BasicThreadFactory.Builder() + .namingPattern("Event_Service_Thread-%d") + .daemon(false) + .priority(Thread.NORM_PRIORITY) + .build(); eventHandlerPool = Executors.newFixedThreadPool(hiveConf.getInt(LensConfConstants.EVENT_SERVICE_THREAD_POOL_SIZE, - numProcs)); + numProcs), factory); super.init(hiveConf); } http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java index 110624a..ca00b4d 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java @@ -77,12 +77,17 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { private final LogSegregationContext logSegregationContext; + /** QueryEndNotifier core and max pool size */ + private static final int CORE_POOL_SIZE = 2; + private static final int MAX_POOL_SIZE = 5; + /** Instantiates a new query end notifier. * * @param queryService the query service * @param hiveConf the hive conf */ public QueryEndNotifier(QueryExecutionServiceImpl queryService, HiveConf hiveConf, @NonNull final LogSegregationContext logSegregationContext) { + super(CORE_POOL_SIZE, MAX_POOL_SIZE); this.queryService = queryService; HiveConf conf = hiveConf; from = conf.get(MAIL_FROM_ADDRESS); @@ -113,23 +118,30 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { boolean whetherMailNotify = Boolean.parseBoolean(queryContext.getConf().get(QUERY_MAIL_NOTIFY, WHETHER_MAIL_NOTIFY_DEFAULT)); - if (!whetherMailNotify) { return; } - String queryName = queryContext.getQueryName(); - String mailSubject = "Query " + (StringUtils.isBlank(queryName) ? "" : (queryName + " ")) - + queryContext.getStatus().getStatus() + ": " + event.getQueryHandle(); + try { + //Create and Send EMAIL + String queryName = queryContext.getQueryName(); + String mailSubject = "Query " + (StringUtils.isBlank(queryName) ? "" : (queryName + " ")) + + queryContext.getStatus().getStatus() + ": " + event.getQueryHandle(); - String mailMessage = createMailMessage(queryContext); + String mailMessage = createMailMessage(queryContext); - String to = queryContext.getSubmittedUser() + "@" + queryService.getServerDomain(); + String to = queryContext.getSubmittedUser() + "@" + queryService.getServerDomain(); - String cc = queryContext.getConf().get(QUERY_RESULT_EMAIL_CC, QUERY_RESULT_DEFAULT_EMAIL_CC); + String cc = queryContext.getConf().get(QUERY_RESULT_EMAIL_CC, QUERY_RESULT_DEFAULT_EMAIL_CC); - log.info("Sending completion email for query handle: {}", event.getQueryHandle()); - sendMail(host, port, new Email(from, to, cc, mailSubject, mailMessage), mailSmtpTimeout, mailSmtpConnectionTimeout); + log.info("Sending completion email for query handle: {}", event.getQueryHandle()); + sendMail(host, port, new Email(from, to, cc, mailSubject, mailMessage), mailSmtpTimeout, + mailSmtpConnectionTimeout); + } catch (Exception e) { + MetricsService metricsService = LensServices.get().getService(MetricsService.NAME); + metricsService.incrCounter(QueryEndNotifier.class, EMAIL_ERROR_COUNTER); + log.error("Error sending query end email", e); + } } /** Creates the mail message. @@ -184,38 +196,32 @@ public class QueryEndNotifier extends AsyncEventListener<QueryEnded> { * @param mailSmtpTimeout the mail smtp timeout * @param mailSmtpConnectionTimeout the mail smtp connection timeout */ public static void sendMail(String host, String port, - Email email, int mailSmtpTimeout, int mailSmtpConnectionTimeout) { + Email email, int mailSmtpTimeout, int mailSmtpConnectionTimeout) throws Exception{ Properties props = System.getProperties(); props.put("mail.smtp.host", host); props.put("mail.smtp.port", port); props.put("mail.smtp.timeout", mailSmtpTimeout); props.put("mail.smtp.connectiontimeout", mailSmtpConnectionTimeout); Session session = Session.getDefaultInstance(props, null); - try { - MimeMessage message = new MimeMessage(session); - message.setFrom(new InternetAddress(email.getFrom())); - for (String recipient : email.getTo().trim().split("\\s*,\\s*")) { - message.addRecipients(Message.RecipientType.TO, InternetAddress.parse(recipient)); - } - if (email.getCc() != null && email.getCc().length() > 0) { - for (String recipient : email.getCc().trim().split("\\s*,\\s*")) { - message.addRecipients(Message.RecipientType.CC, InternetAddress.parse(recipient)); - } + MimeMessage message = new MimeMessage(session); + message.setFrom(new InternetAddress(email.getFrom())); + for (String recipient : email.getTo().trim().split("\\s*,\\s*")) { + message.addRecipients(Message.RecipientType.TO, InternetAddress.parse(recipient)); + } + if (email.getCc() != null && email.getCc().length() > 0) { + for (String recipient : email.getCc().trim().split("\\s*,\\s*")) { + message.addRecipients(Message.RecipientType.CC, InternetAddress.parse(recipient)); } - message.setSubject(email.getSubject()); - message.setSentDate(new Date()); + } + message.setSubject(email.getSubject()); + message.setSentDate(new Date()); - MimeBodyPart messagePart = new MimeBodyPart(); - messagePart.setText(email.getMessage()); - Multipart multipart = new MimeMultipart(); + MimeBodyPart messagePart = new MimeBodyPart(); + messagePart.setText(email.getMessage()); + Multipart multipart = new MimeMultipart(); - multipart.addBodyPart(messagePart); - message.setContent(multipart); - Transport.send(message); - } catch (Exception e) { - MetricsService metricsService = LensServices.get().getService(MetricsService.NAME); - metricsService.incrCounter(QueryEndNotifier.class, EMAIL_ERROR_COUNTER); - log.error("Error sending query end email", e); - } + multipart.addBodyPart(messagePart); + message.setContent(multipart); + Transport.send(message); } } http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java index f568b17..9955278 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java @@ -46,6 +46,10 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> { /** The query service. */ QueryExecutionServiceImpl queryService; + /** ResultFormatter core and max pool size */ + private static final int CORE_POOL_SIZE = 5; + private static final int MAX_POOL_SIZE = 10; + private final LogSegregationContext logSegregationContext; /** @@ -54,6 +58,7 @@ public class ResultFormatter extends AsyncEventListener<QueryExecuted> { * @param queryService the query service */ public ResultFormatter(QueryExecutionServiceImpl queryService, @NonNull LogSegregationContext logSegregationContext) { + super(CORE_POOL_SIZE, MAX_POOL_SIZE); this.queryService = queryService; this.logSegregationContext = logSegregationContext; } http://git-wip-us.apache.org/repos/asf/lens/blob/73f92430/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java index 702a529..a2ca17f 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestEventService.java @@ -20,6 +20,9 @@ package org.apache.lens.server.query; import static org.testng.Assert.*; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -503,4 +506,46 @@ public class TestEventService { } + @Test + public void testAysncEventListenerPoolThreads(){ + AsyncEventListener<QuerySuccess> ayncListener = new DummyAsncEventListener(); + for(int i=0; i<10; i++){ + try { + //A pool thread is created each time an event is submitted until core pool size is reached which is 5 + //for this test case. @see org.apache.lens.server.api.events.AsyncEventListener.processor + ayncListener.onEvent(null); + } catch (LensException e) { + assert(false); //Not Expected + } + } + + //Verify the core pool Threads after the events have been fired + ThreadGroup currentTG = Thread.currentThread().getThreadGroup(); + int count = currentTG.activeCount(); + Thread[] threads = new Thread[count]; + currentTG.enumerate(threads); + Set<String> aysncThreadNames = new HashSet<String>(); + for(Thread t : threads){ + if (t.getName().contains("DummyAsncEventListener_AsyncThread")){ + aysncThreadNames.add(t.getName()); + } + } + assertTrue(aysncThreadNames.containsAll(Arrays.asList( + "DummyAsncEventListener_AsyncThread-1", + "DummyAsncEventListener_AsyncThread-2", + "DummyAsncEventListener_AsyncThread-3", + "DummyAsncEventListener_AsyncThread-4", + "DummyAsncEventListener_AsyncThread-5"))); + } + + private static class DummyAsncEventListener extends AsyncEventListener<QuerySuccess> { + public DummyAsncEventListener(){ + super(5, 10); //core pool = 5 and max Pool size =10 + } + @Override + public void process(QuerySuccess event) { + throw new RuntimeException("Simulated Exception"); + } + } + }
