Use EventBus instead of BufferedAuditLogger custom implementation
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0a1e572e Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0a1e572e Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0a1e572e Branch: refs/heads/audit_logging Commit: 0a1e572e9ce9f9b5eb2a7710c5416fbf97e1a151 Parents: 1678b34 Author: Daniel Gergely <[email protected]> Authored: Wed Mar 23 15:22:49 2016 +0100 Committer: Toader, Sebastian <[email protected]> Committed: Thu Mar 24 13:06:50 2016 +0100 ---------------------------------------------------------------------- .../ambari/server/audit/AsyncAuditLogger.java | 85 +++++++++ .../server/audit/AuditLoggerDefaultImpl.java | 2 + .../ambari/server/audit/AuditLoggerModule.java | 8 +- .../server/audit/BufferedAuditLogger.java | 143 --------------- .../server/audit/BufferedAuditLoggerTest.java | 174 ------------------- 5 files changed, 90 insertions(+), 322 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/main/java/org/apache/ambari/server/audit/AsyncAuditLogger.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/audit/AsyncAuditLogger.java b/ambari-server/src/main/java/org/apache/ambari/server/audit/AsyncAuditLogger.java new file mode 100644 index 0000000..ac96391 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/audit/AsyncAuditLogger.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.audit; + + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.ambari.server.audit.event.AuditEvent; + +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.google.inject.name.Named; + +/** + * This is a wrapper for an audit log implementation that uses {@link EventBus} to make audit logging asynchronous + */ +@Singleton +class AsyncAuditLogger implements AuditLogger { + /** + * Name for guice injection + */ + final static String InnerLogger = "AsyncAuditLogger"; + + /** + * Event bus that holds audit event objects + */ + private EventBus eventBus; + + /** + * Constructor. + * + * @param auditLogger the audit logger to use + */ + @Inject + public AsyncAuditLogger(@Named(InnerLogger) AuditLogger auditLogger) { + eventBus = new AsyncEventBus("AuditLoggerEventBus", new ThreadPoolExecutor(0, 1, 5L, TimeUnit.MINUTES, + new LinkedBlockingQueue<Runnable>(), new AuditLogThreadFactory(), + new ThreadPoolExecutor.CallerRunsPolicy())); + eventBus.register(auditLogger); + } + + @Override + public void log(AuditEvent event) { + eventBus.post(event); + } + + /** + * A custom {@link ThreadFactory} for the threads that logs audit events + */ + private static final class AuditLogThreadFactory implements ThreadFactory { + + private static final AtomicInteger nextId = new AtomicInteger(1); + + /** + * {@inheritDoc} + */ + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, "auditlog-" + nextId.getAndIncrement()); + thread.setDaemon(false); + return thread; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java index adac54a..16d568d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerDefaultImpl.java @@ -26,6 +26,7 @@ import org.apache.ambari.server.audit.event.AuditEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.eventbus.Subscribe; import com.google.inject.Singleton; /** @@ -40,6 +41,7 @@ public class AuditLoggerDefaultImpl implements AuditLogger { * {@inheritDoc} */ @Override + @Subscribe public void log(AuditEvent event) { Date date = new Date(event.getTimestamp()); //2016-03-11T10:42:36.376Z http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java index 876c4d9..b20714b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/audit/AuditLoggerModule.java @@ -46,10 +46,8 @@ import org.apache.ambari.server.audit.request.eventcreator.UserEventCreator; import org.apache.ambari.server.audit.request.eventcreator.ValidationIgnoreEventCreator; import org.apache.ambari.server.audit.request.eventcreator.ViewInstanceEventCreator; import org.apache.ambari.server.audit.request.eventcreator.ViewPrivilegeEventCreator; -import org.apache.ambari.server.configuration.Configuration; import com.google.inject.AbstractModule; -import com.google.inject.Inject; import com.google.inject.multibindings.Multibinder; import com.google.inject.name.Names; @@ -57,10 +55,10 @@ public class AuditLoggerModule extends AbstractModule { @Override protected void configure() { - bind(AuditLogger.class).to(BufferedAuditLogger.class); + bind(AuditLogger.class).to(AsyncAuditLogger.class); - // set AuditLoggerDefaultImpl to be used by BufferedAuditLogger - bind(AuditLogger.class).annotatedWith(Names.named(BufferedAuditLogger.InnerLogger)).to(AuditLoggerDefaultImpl.class); + // set AuditLoggerDefaultImpl to be used by AsyncAuditLogger + bind(AuditLogger.class).annotatedWith(Names.named(AsyncAuditLogger.InnerLogger)).to(AuditLoggerDefaultImpl.class); // binding for audit event creators Multibinder<RequestAuditEventCreator> auditLogEventCreatorBinder = Multibinder.newSetBinder(binder(), RequestAuditEventCreator.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/main/java/org/apache/ambari/server/audit/BufferedAuditLogger.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/audit/BufferedAuditLogger.java b/ambari-server/src/main/java/org/apache/ambari/server/audit/BufferedAuditLogger.java deleted file mode 100644 index e11bfd5..0000000 --- a/ambari-server/src/main/java/org/apache/ambari/server/audit/BufferedAuditLogger.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ambari.server.audit; - - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; - -import org.apache.ambari.server.audit.event.AuditEvent; -import org.apache.ambari.server.configuration.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.google.inject.name.Named; - -/** - * This is a decorator that adds buffering and running on separate thread (instead of the tread of the caller) - * to an existing audit logger implementation. - * It uses a bounded queue for holding audit events before they are logged. - */ -@Singleton -public class BufferedAuditLogger implements AuditLogger { - - private static final Logger LOG = LoggerFactory.getLogger(BufferedAuditLogger.class); - - /** - * Capacity of the buffer - */ - private final int bufferCapacity; - - /** - * Buffer capacity status - */ - private final double capacityWaterMark; - - /** - * Wrapped audit logger - */ - private final AuditLogger auditLogger; - - /** - * Thread pool - */ - private final ExecutorService pool; - - /** - * Names for guice injection - */ - public final static String InnerLogger = "BufferedAuditLogger"; - public final static String Capacity = "BufferedAuditLogger.capacity"; - - - /** - * The queue that holds the audit events to be logged in case there are - * peeks when the producers logs audit events at a higher pace than - * this audit logger can consume. - */ - protected final BlockingQueue<AuditEvent> auditEventWorkQueue; - - private class Consumer implements Runnable { - @Override - public void run() { - while (true) { - try { - AuditEvent auditEvent = auditEventWorkQueue.take(); - auditLogger.log(auditEvent); - } catch (InterruptedException ex) { - LOG.error("Logging of audit events interrupted ! There are {} audit events left unlogged !", auditEventWorkQueue.size()); - - pool.shutdownNow(); - Thread.currentThread().interrupt(); - - } catch (Exception ex) { - LOG.error("Error caught during logging audit events: " + ex); - } - - } - } - } - - - /** - * Constructor. - * - * @param auditLogger the audit logger to extend - */ - @Inject - public BufferedAuditLogger(@Named(InnerLogger) AuditLogger auditLogger, Configuration configuration) { - this.bufferCapacity = configuration.getBufferedAuditLoggerCapacity(); - this.capacityWaterMark = 0.2; // 20 percent of full capacity - - this.auditEventWorkQueue = new LinkedBlockingQueue<>(bufferCapacity); - this.auditLogger = auditLogger; - - this.pool = Executors.newSingleThreadExecutor(); - pool.execute(new Consumer()); - - } - - - /** - * Logs audit log events - * - * @param event - */ - @Override - public void log(final AuditEvent event) { - - try { - - this.auditEventWorkQueue.put(event); - - int remainingCapacity = this.auditEventWorkQueue.remainingCapacity(); - - LOG.debug("Work queue load: [{}/{}]", bufferCapacity - remainingCapacity, bufferCapacity); - - if (remainingCapacity < bufferCapacity * capacityWaterMark) - LOG.warn("Work queue remaining capacity: {} is below {}%", remainingCapacity, capacityWaterMark * 100); - - } catch (InterruptedException ex) { - LOG.error("Interrupted while waiting to queue audit event: " + event.getAuditMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/0a1e572e/ambari-server/src/test/java/org/apache/ambari/server/audit/BufferedAuditLoggerTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/audit/BufferedAuditLoggerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/audit/BufferedAuditLoggerTest.java deleted file mode 100644 index 445c339..0000000 --- a/ambari-server/src/test/java/org/apache/ambari/server/audit/BufferedAuditLoggerTest.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.server.audit; - -import java.util.Collections; -import java.util.List; - -import org.apache.ambari.server.audit.event.AuditEvent; -import org.apache.ambari.server.audit.event.OperationStatusAuditEvent; -import org.apache.ambari.server.configuration.Configuration; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.EasyMockRule; -import org.easymock.Mock; -import org.easymock.MockType; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -import com.google.common.collect.ImmutableList; - -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.capture; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.newCapture; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; -import static org.easymock.EasyMock.verify; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertThat; - -public class BufferedAuditLoggerTest { - - @Rule - public EasyMockRule mocks = new EasyMockRule(this); - - @Mock(type = MockType.NICE) - private AuditEvent auditEvent; - - @Mock(type = MockType.STRICT) - private AuditLogger auditLogger; - - @Mock(type = MockType.STRICT) - private Configuration configuration; - - - @Before - public void setUp() throws Exception { - reset(auditEvent, auditLogger); - - } - - @Test(timeout = 300) - public void testLog() throws Exception { - // Given - Capture<AuditEvent> capturedArgument = newCapture(); - auditLogger.log(capture(capturedArgument)); - - EasyMock.expect(configuration.getBufferedAuditLoggerCapacity()).andReturn(50); - replay(configuration); - - BufferedAuditLogger bufferedAuditLogger = new BufferedAuditLogger(auditLogger, configuration); - - replay(auditLogger, auditEvent); - - - // When - bufferedAuditLogger.log(auditEvent); - - Thread.sleep(100); - // Then - verify(auditLogger, configuration); - - - assertThat(capturedArgument.getValue(), equalTo(auditEvent)); - } - - @Test(timeout = 300) - public void testConsumeAuditEventsFromInternalBuffer() throws Exception { - // Given - EasyMock.expect(configuration.getBufferedAuditLoggerCapacity()).andReturn(5); - replay(configuration); - BufferedAuditLogger bufferedAuditLogger = new BufferedAuditLogger(auditLogger, configuration); - - List<AuditEvent> auditEvents = Collections.nCopies(50, auditEvent); - - auditLogger.log((AuditEvent)anyObject(AuditEvent.class)); - expectLastCall().times(50); - - replay(auditLogger, auditEvent); - - // When - for (AuditEvent event : auditEvents) { - bufferedAuditLogger.log(event); - } - - // Then - while (!bufferedAuditLogger.auditEventWorkQueue.isEmpty()) { - Thread.sleep(100); - } - - verify(auditLogger, auditEvent, configuration); - } - - @Test(timeout = 3000) - public void testMultipleProducersLogging() throws Exception { - // Given - int nProducers = 100; - - EasyMock.expect(configuration.getBufferedAuditLoggerCapacity()).andReturn(10000); - replay(configuration); - - final BufferedAuditLogger bufferedAuditLogger = new BufferedAuditLogger(new AuditLoggerDefaultImpl(), configuration); - - ImmutableList.Builder<Thread> producersBuilder = ImmutableList.builder(); - - for (int i = 0; i < nProducers; i++) { - final Integer reqId = i * 10000; - final AuditEvent event = - OperationStatusAuditEvent.builder() - .withStatus("IN PROGRESS") - .withTimestamp(System.currentTimeMillis()) - .withRequestId(reqId.toString()) - .build(); - - producersBuilder.add(new Thread(new Runnable() { - final int nAuditEventsPerProducer = 100; - - @Override - public void run() { - for (int j = 0; j < nAuditEventsPerProducer; j++) { - bufferedAuditLogger.log(event); - } - - } - } - - )); - } - - List<Thread> producers = producersBuilder.build(); - - - - // When - for (Thread producer : producers) { - producer.start(); // nProducers threads creating nAuditEventsPerProducer events each in parallel - } - - // Then - while (!bufferedAuditLogger.auditEventWorkQueue.isEmpty()) { - Thread.sleep(100); - } - - verify(configuration); - - } -}
