This is an automated email from the ASF dual-hosted git repository.

amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a57f92  ATLAS-3878: Notifications: Improve Memory Usage for HBase 
Audits Writing
4a57f92 is described below

commit 4a57f92f649e389b8f5ef653394a5f091f1a36de
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Fri Jul 31 08:54:04 2020 -0700

    ATLAS-3878: Notifications: Improve Memory Usage for HBase Audits Writing
---
 .../java/org/apache/atlas/AtlasConfiguration.java  |   1 +
 .../java/org/apache/atlas/model/Clearable.java     |  22 +++
 .../atlas/model/audit/EntityAuditEventV2.java      |  16 ++-
 .../org/apache/atlas/utils/FixedBufferList.java    |  87 ++++++++++++
 .../apache/atlas/utils/FixedBufferListTest.java    | 146 +++++++++++++++++++
 .../repository/audit/EntityAuditListenerV2.java    | 157 +++++++++++----------
 6 files changed, 351 insertions(+), 78 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java 
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index a942b9f..1c79158 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -47,6 +47,7 @@ public enum AtlasConfiguration {
     
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled",
 true),
     
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds",
 15 * 60),
     
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds",
 5 * 60),
+    
NOTIFICATION_FIXED_BUFFER_ITEMS_INCREMENT_COUNT("atlas.notification.fixed.buffer.items.increment.count",
 10),
 
     
NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.notification.consumer.create.shell.entity.for.non-existing.ref",
 true),
     
REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.rest.create.shell.entity.for.non-existing.ref",
 false),
diff --git a/intg/src/main/java/org/apache/atlas/model/Clearable.java 
b/intg/src/main/java/org/apache/atlas/model/Clearable.java
new file mode 100644
index 0000000..5f9f426
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/Clearable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model;
+
+public interface Clearable {
+    void clear();
+}
diff --git 
a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java 
b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
index 63116d4..c37e282 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.Clearable;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntityHeader;
 import org.apache.atlas.type.AtlasType;
@@ -44,7 +45,7 @@ import static 
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.EN
 @JsonIgnoreProperties(ignoreUnknown=true)
 @XmlRootElement
 @XmlAccessorType(XmlAccessType.PROPERTY)
-public class EntityAuditEventV2 implements Serializable {
+public class EntityAuditEventV2 implements Serializable, Clearable {
     public enum EntityAuditType { ENTITY_AUDIT_V1, ENTITY_AUDIT_V2 }
 
     public enum EntityAuditActionV2 {
@@ -255,6 +256,19 @@ public class EntityAuditEventV2 implements Serializable {
         return ret;
     }
 
+    @JsonIgnore
+    @Override
+    public void clear() {
+        entityId = null;
+        timestamp = 0L;
+        user = null;
+        action = null;
+        details = null;
+        eventKey = null;
+        entity = null;
+        type = null;
+    }
+
     private String getJsonPartFromDetails() {
         String ret = null;
         if(StringUtils.isNotEmpty(details)) {
diff --git a/intg/src/main/java/org/apache/atlas/utils/FixedBufferList.java 
b/intg/src/main/java/org/apache/atlas/utils/FixedBufferList.java
new file mode 100644
index 0000000..cf6ec02
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/utils/FixedBufferList.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.Clearable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FixedBufferList<T extends Clearable> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FixedBufferList.class);
+
+    private final Class<T> itemClass;
+    private final ArrayList<T> buffer;
+    private final int incrementCapacityBy;
+
+    private int length;
+
+    public FixedBufferList(Class<T> clazz) {
+        this(clazz, 1);
+    }
+
+    public FixedBufferList(Class<T> clazz, int incrementCapacityBy) {
+        this.incrementCapacityBy = (incrementCapacityBy <= 0 ? 1 : 
incrementCapacityBy);
+        this.itemClass = clazz;
+        this.buffer = new ArrayList<>();
+    }
+
+    public T next() {
+        request(length + 1);
+        return buffer.get(length++);
+    }
+
+    public List<T> toList() {
+        return this.buffer.subList(0, this.length);
+    }
+
+    public void reset() {
+        for (int i = 0; i < buffer.size(); i++) {
+            buffer.get(i).clear();
+        }
+
+        this.length = 0;
+    }
+
+    private void request(int requestedCapacity) {
+        if (requestedCapacity <= this.buffer.size()) {
+            return;
+        }
+
+        int oldCapacity = this.buffer.size();
+        int newCapacity = oldCapacity + this.incrementCapacityBy;
+        this.buffer.ensureCapacity(newCapacity);
+        instantiateItems(oldCapacity, newCapacity);
+
+        LOG.info("FixedBufferList: Requested: {} From: {} To:{}", 
requestedCapacity, oldCapacity, newCapacity);
+    }
+
+    private void instantiateItems(int startIndex, int maxSize) {
+        for (int i = startIndex; i < maxSize; i++) {
+            try {
+                this.buffer.add(itemClass.newInstance());
+            } catch (InstantiationException e) {
+                LOG.error("FixedBufferList: InstantiationException: 
Instantiation failed!", e);
+            } catch (IllegalAccessException e) {
+                LOG.error("FixedBufferList: IllegalAccessException: 
Instantiation failed!", e);
+            }
+        }
+    }
+}
diff --git a/intg/src/test/java/org/apache/atlas/utils/FixedBufferListTest.java 
b/intg/src/test/java/org/apache/atlas/utils/FixedBufferListTest.java
new file mode 100644
index 0000000..19bfe49
--- /dev/null
+++ b/intg/src/test/java/org/apache/atlas/utils/FixedBufferListTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.Clearable;
+import org.apache.commons.lang.StringUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class FixedBufferListTest {
+    private String STR_PREFIX = "str:%s";
+
+    public static class Spying implements Clearable {
+        public static AtomicInteger callsToCtor = new AtomicInteger();
+        public static AtomicInteger callsToClear = new AtomicInteger();
+
+        private int anInt;
+        private String aString;
+        private long aLong;
+
+        public Spying() {
+            callsToCtor.incrementAndGet();
+        }
+
+        @Override
+        public void clear() {
+            callsToClear.incrementAndGet();
+
+            anInt = 0;
+            aString = StringUtils.EMPTY;
+            aLong = 0;
+        }
+
+        public static void resetCounters() {
+            Spying.callsToCtor.set(0);
+            Spying.callsToClear.set(0);
+        }
+    }
+
+    private static class SpyingFixedBufferList extends FixedBufferList<Spying> 
{
+        public SpyingFixedBufferList(int incrementCapacityFactor) {
+            super(Spying.class, incrementCapacityFactor);
+        }
+    }
+
+    @Test
+    public void instantiateListWithParameterizedClass() {
+        FixedBufferList<Spying> list = new FixedBufferList<>(Spying.class);
+        assertNotNull(list);
+    }
+
+    @Test
+    public void createdBasedOnInitialSize() {
+        Spying.resetCounters();
+
+        int incrementByFactor = 2;
+        SpyingFixedBufferList fixedBufferList = new 
SpyingFixedBufferList(incrementByFactor);
+        addElements(fixedBufferList, 0, 3);
+
+        List<Spying> list = fixedBufferList.toList();
+        assertSpyingList(list, 3);
+        assertEquals(Spying.callsToCtor.get(), incrementByFactor * 2);
+    }
+
+    @Test (dependsOnMethods = "createdBasedOnInitialSize")
+    public void bufferIncreasesIfNeeded() {
+        Spying.resetCounters();
+
+        int incrementSizeBy = 5;
+        SpyingFixedBufferList fixedBufferList = new 
SpyingFixedBufferList(incrementSizeBy);
+        addElements(fixedBufferList, 0, incrementSizeBy);
+        List<Spying> spyings = fixedBufferList.toList();
+        assertEquals(spyings.size(), incrementSizeBy);
+        assertEquals(Spying.callsToCtor.get(), incrementSizeBy);
+
+        fixedBufferList.reset();
+        addElements(fixedBufferList, 0, incrementSizeBy * 2);
+        spyings = fixedBufferList.toList();
+        assertEquals(Spying.callsToCtor.get(), incrementSizeBy * 2);
+        assertSpyingList(spyings, incrementSizeBy * 2);
+        assertEquals(Spying.callsToClear.get(), incrementSizeBy);
+    }
+
+    @Test
+    public void retrieveEmptyList() {
+        int size = 5;
+        SpyingFixedBufferList fixedBufferList = new 
SpyingFixedBufferList(size);
+
+        List<Spying> list = fixedBufferList.toList();
+        assertEquals(list.size(), 0);
+
+        addElements(fixedBufferList, 0, 3);
+        list = fixedBufferList.toList();
+        assertEquals(list.size(), 3);
+    }
+
+    private void assertSpyingList(List<Spying> list, int expectedSize) {
+        assertEquals(list.size(), expectedSize);
+        for (int i1 = 0; i1 < list.size(); i1++) {
+            Assert.assertNotNull(list.get(i1));
+            assertSpying(list.get(i1), i1);
+        }
+    }
+
+    private void assertSpying(Spying spying, int i) {
+        assertEquals(spying.aLong, i);
+        assertEquals(spying.anInt, i);
+        assertEquals(spying.aString, String.format(STR_PREFIX, i));
+    }
+
+    private Spying createSpyingClass(Spying spying, int i) {
+        spying.aLong = i;
+        spying.anInt = i;
+        spying.aString = String.format(STR_PREFIX, i);
+
+        return spying;
+    }
+
+    private void addElements(SpyingFixedBufferList fixedBufferList, int 
startIndex, int numElements) {
+        for (int i = startIndex; i < (startIndex + numElements); i++) {
+            Spying spyForUpdate = fixedBufferList.next();
+            createSpyingClass(spyForUpdate, i);
+        }
+    }
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 79527ac..ca6f373 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.repository.audit;
 
+import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
 import org.apache.atlas.RequestContext;
 import org.apache.atlas.model.audit.EntityAuditEventV2;
@@ -36,6 +37,7 @@ import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.AtlasJson;
 import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
+import org.apache.atlas.utils.FixedBufferList;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -45,8 +47,6 @@ import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -74,6 +74,9 @@ import static 
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV
 @Component
 public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
     private static final Logger LOG = 
LoggerFactory.getLogger(EntityAuditListenerV2.class);
+    private static final ThreadLocal<FixedBufferList<EntityAuditEventV2>> 
AUDIT_EVENTS_BUFFER =
+            ThreadLocal.withInitial(() -> new 
FixedBufferList<>(EntityAuditEventV2.class,
+                    
AtlasConfiguration.NOTIFICATION_FIXED_BUFFER_ITEMS_INCREMENT_COUNT.getInt()));
 
     private final EntityAuditRepository  auditRepository;
     private final AtlasTypeRegistry      typeRegistry;
@@ -90,15 +93,12 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) 
throws AtlasBaseException {
         MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
 
-        List<EntityAuditEventV2> events = new ArrayList<>();
-
+        FixedBufferList<EntityAuditEventV2> entitiesAdded = 
getAuditEventsList();
         for (AtlasEntity entity : entities) {
-            EntityAuditEventV2 event = createEvent(entity, isImport ? 
ENTITY_IMPORT_CREATE : ENTITY_CREATE);
-
-            events.add(event);
+            createEvent(entitiesAdded.next(), entity, isImport ? 
ENTITY_IMPORT_CREATE : ENTITY_CREATE);
         }
 
-        auditRepository.putEventsV2(events);
+        auditRepository.putEventsV2(entitiesAdded.toList());
 
         RequestContext.get().endMetricRecord(metric);
     }
@@ -107,15 +107,12 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     public void onEntitiesUpdated(List<AtlasEntity> entities, boolean 
isImport) throws AtlasBaseException {
         MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
 
-        List<EntityAuditEventV2> events = new ArrayList<>();
-
+        FixedBufferList<EntityAuditEventV2> updatedEvents = 
getAuditEventsList();
         for (AtlasEntity entity : entities) {
-            EntityAuditEventV2 event = createEvent(entity, isImport ? 
ENTITY_IMPORT_UPDATE : ENTITY_UPDATE);
-
-            events.add(event);
+            createEvent(updatedEvents.next(), entity, isImport ? 
ENTITY_IMPORT_UPDATE : ENTITY_UPDATE);
         }
 
-        auditRepository.putEventsV2(events);
+        auditRepository.putEventsV2(updatedEvents.toList());
 
         RequestContext.get().endMetricRecord(metric);
     }
@@ -124,15 +121,12 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     public void onEntitiesDeleted(List<AtlasEntity> entities, boolean 
isImport) throws AtlasBaseException {
         MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
 
-        List<EntityAuditEventV2> events = new ArrayList<>();
-
+        FixedBufferList<EntityAuditEventV2> deletedEntities = 
getAuditEventsList();
         for (AtlasEntity entity : entities) {
-            EntityAuditEventV2 event = createEvent(entity, isImport ? 
ENTITY_IMPORT_DELETE : ENTITY_DELETE, "Deleted entity");
-
-            events.add(event);
+            createEvent(deletedEntities.next(), entity, isImport ? 
ENTITY_IMPORT_DELETE : ENTITY_DELETE, "Deleted entity");
         }
 
-        auditRepository.putEventsV2(events);
+        auditRepository.putEventsV2(deletedEntities.toList());
 
         RequestContext.get().endMetricRecord(metric);
     }
@@ -141,14 +135,12 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     public void onEntitiesPurged(List<AtlasEntity> entities) throws 
AtlasBaseException {
         MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
 
-        List<EntityAuditEventV2> events = new ArrayList<>();
-
+        FixedBufferList<EntityAuditEventV2> eventsPurged = 
getAuditEventsList();
         for (AtlasEntity entity : entities) {
-            EntityAuditEventV2 event = createEvent(entity, ENTITY_PURGE);
-            events.add(event);
+            createEvent(eventsPurged.next(), entity, ENTITY_PURGE);
         }
 
-        auditRepository.putEventsV2(events);
+        auditRepository.putEventsV2(eventsPurged.toList());
 
         RequestContext.get().endMetricRecord(metric);
     }
@@ -158,17 +150,16 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
         if (CollectionUtils.isNotEmpty(classifications)) {
             MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
 
-            List<EntityAuditEventV2> events = new ArrayList<>();
-
+            FixedBufferList<EntityAuditEventV2> classificationsAdded = 
getAuditEventsList();
             for (AtlasClassification classification : classifications) {
                 if (entity.getGuid().equals(classification.getEntityGuid())) {
-                    events.add(createEvent(entity, CLASSIFICATION_ADD, "Added 
classification: " + AtlasType.toJson(classification)));
+                    createEvent(classificationsAdded.next(), entity, 
CLASSIFICATION_ADD, "Added classification: " + 
AtlasType.toJson(classification));
                 } else {
-                    events.add(createEvent(entity, 
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + 
AtlasType.toJson(classification)));
+                    createEvent(classificationsAdded.next(), entity, 
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + 
AtlasType.toJson(classification));
                 }
             }
 
-            auditRepository.putEventsV2(events);
+            auditRepository.putEventsV2(classificationsAdded.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
@@ -177,20 +168,20 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     @Override
     public void onClassificationsAdded(List<AtlasEntity> entities, 
List<AtlasClassification> classifications) throws AtlasBaseException {
         if (CollectionUtils.isNotEmpty(classifications)) {
-            MetricRecorder           metric = 
RequestContext.get().startMetricRecord("entityAudit");
-            List<EntityAuditEventV2> events = Collections.synchronizedList(new 
ArrayList<>());
+            MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
+            FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
 
             for (AtlasClassification classification : classifications) {
                 for (AtlasEntity entity : entities) {
                     if 
(entity.getGuid().equals(classification.getEntityGuid())) {
-                        events.add(createEvent(entity, CLASSIFICATION_ADD, 
"Added classification: " + AtlasType.toJson(classification)));
+                        createEvent(events.next(), entity, CLASSIFICATION_ADD, 
"Added classification: " + AtlasType.toJson(classification));
                     } else {
-                        events.add(createEvent(entity, 
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + 
AtlasType.toJson(classification)));
+                        createEvent(events.next(), entity, 
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + 
AtlasType.toJson(classification));
                     }
                 }
             }
 
-            auditRepository.putEventsV2(events);
+            auditRepository.putEventsV2(events.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
@@ -201,24 +192,24 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
         if (CollectionUtils.isNotEmpty(classifications)) {
             MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
 
-            List<EntityAuditEventV2> events = new ArrayList<>();
-            String                   guid   = entity.getGuid();
+            FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
+            String guid = entity.getGuid();
 
             for (AtlasClassification classification : classifications) {
                 if (guid.equals(classification.getEntityGuid())) {
-                    events.add(createEvent(entity, CLASSIFICATION_UPDATE, 
"Updated classification: " + AtlasType.toJson(classification)));
+                    createEvent(events.next(), entity, CLASSIFICATION_UPDATE, 
"Updated classification: " + AtlasType.toJson(classification));
                 } else {
                     if (isPropagatedClassificationAdded(guid, classification)) 
{
-                        events.add(createEvent(entity, 
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + 
AtlasType.toJson(classification)));
+                        createEvent(events.next(), entity, 
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " + 
AtlasType.toJson(classification));
                     } else if (isPropagatedClassificationDeleted(guid, 
classification)) {
-                        events.add(createEvent(entity, 
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + 
classification.getTypeName()));
+                        createEvent(events.next(), entity, 
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + 
classification.getTypeName());
                     } else {
-                        events.add(createEvent(entity, 
PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " + 
AtlasType.toJson(classification)));
+                        createEvent(events.next(), entity, 
PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " + 
AtlasType.toJson(classification));
                     }
                 }
             }
 
-            auditRepository.putEventsV2(events);
+            auditRepository.putEventsV2(events.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
@@ -227,19 +218,19 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     @Override
     public void onClassificationsDeleted(AtlasEntity entity, 
List<AtlasClassification> classifications) throws AtlasBaseException {
         if (CollectionUtils.isNotEmpty(classifications)) {
-            MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
+            MetricRecorder metric = 
RequestContext.get().startMetricRecord("onClassificationsDeleted");
 
-            List<EntityAuditEventV2> events = new ArrayList<>();
+            FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
 
             for (AtlasClassification classification : classifications) {
                 if (StringUtils.equals(entity.getGuid(), 
classification.getEntityGuid())) {
-                    events.add(createEvent(entity, CLASSIFICATION_DELETE, 
"Deleted classification: " + classification.getTypeName()));
+                    createEvent(events.next(), entity, CLASSIFICATION_DELETE, 
"Deleted classification: " + classification.getTypeName());
                 } else {
-                    events.add(createEvent(entity, 
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + 
classification.getTypeName()));
+                    createEvent(events.next(), entity, 
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + 
classification.getTypeName());
                 }
             }
 
-            auditRepository.putEventsV2(events);
+            auditRepository.putEventsV2(events.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
@@ -248,20 +239,20 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     @Override
     public void onClassificationsDeleted(List<AtlasEntity> entities, 
List<AtlasClassification> classifications) throws AtlasBaseException {
         if (CollectionUtils.isNotEmpty(classifications) && 
CollectionUtils.isNotEmpty(entities)) {
-            MetricRecorder           metric = 
RequestContext.get().startMetricRecord("entityAudit");
-            List<EntityAuditEventV2> events = Collections.synchronizedList(new 
ArrayList<>());
+            MetricRecorder metric = 
RequestContext.get().startMetricRecord("onClassificationsDeleted");
+            FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
 
             for (AtlasClassification classification : classifications) {
                 for (AtlasEntity entity : entities) {
                     if (StringUtils.equals(entity.getGuid(), 
classification.getEntityGuid())) {
-                        events.add(createEvent(entity, CLASSIFICATION_DELETE, 
"Deleted classification: " + classification.getTypeName()));
+                        createEvent(events.next(), entity, 
CLASSIFICATION_DELETE, "Deleted classification: " + 
classification.getTypeName());
                     } else {
-                        events.add(createEvent(entity, 
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + 
classification.getTypeName()));
+                        createEvent(events.next(), entity, 
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " + 
classification.getTypeName());
                     }
                 }
             }
 
-            auditRepository.putEventsV2(events);
+            auditRepository.putEventsV2(events.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
@@ -270,19 +261,19 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     @Override
     public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId> 
entities) throws AtlasBaseException {
         if (term != null && CollectionUtils.isNotEmpty(entities)) {
-            MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
+            MetricRecorder metric = 
RequestContext.get().startMetricRecord("onTermAdded");
 
-            List<EntityAuditEventV2> events = new ArrayList<>();
+            FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
 
             for (AtlasRelatedObjectId relatedObjectId : entities) {
                 AtlasEntity entity = 
instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
 
                 if (entity != null) {
-                    events.add(createEvent(entity, TERM_ADD, "Added term: " + 
term.toAuditString()));
+                    createEvent(events.next(), entity, TERM_ADD, "Added term: 
" + term.toAuditString());
                 }
             }
 
-            auditRepository.putEventsV2(events);
+            auditRepository.putEventsV2(events.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
@@ -291,19 +282,19 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     @Override
     public void onTermDeleted(AtlasGlossaryTerm term, 
List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
         if (term != null && CollectionUtils.isNotEmpty(entities)) {
-            MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
+            MetricRecorder metric = 
RequestContext.get().startMetricRecord("onTermDeleted");
 
-            List<EntityAuditEventV2> events = new ArrayList<>();
+            FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
 
             for (AtlasRelatedObjectId relatedObjectId : entities) {
                 AtlasEntity entity = 
instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
 
                 if (entity != null) {
-                    events.add(createEvent(entity, TERM_DELETE, "Deleted term: 
" + term.toAuditString()));
+                    createEvent(events.next(), entity, TERM_DELETE, "Deleted 
term: " + term.toAuditString());
                 }
             }
 
-            auditRepository.putEventsV2(events);
+            auditRepository.putEventsV2(events.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
@@ -312,15 +303,15 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     @Override
     public void onLabelsAdded(AtlasEntity entity, Set<String> labels) throws 
AtlasBaseException {
         if (CollectionUtils.isNotEmpty(labels)) {
-            MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
+            MetricRecorder metric = 
RequestContext.get().startMetricRecord("onLabelsAdded");
 
-            List<EntityAuditEventV2> events = new ArrayList<>();
+            FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
 
             String addedLabels = StringUtils.join(labels, " ");
 
-            events.add(createEvent(entity, LABEL_ADD, "Added labels: " + 
addedLabels));
+            createEvent(events.next(), entity, LABEL_ADD, "Added labels: " + 
addedLabels);
 
-            auditRepository.putEventsV2(events);
+            auditRepository.putEventsV2(events.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
@@ -329,15 +320,15 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
     @Override
     public void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws 
AtlasBaseException {
         if (CollectionUtils.isNotEmpty(labels)) {
-            MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
+            MetricRecorder metric = 
RequestContext.get().startMetricRecord("onLabelsDeleted");
 
-            List<EntityAuditEventV2> events = new ArrayList<>();
+            FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
 
             String deletedLabels = StringUtils.join(labels, " ");
 
-            events.add(createEvent(entity, LABEL_DELETE, "Deleted labels: " + 
deletedLabels));
+            createEvent(events.next(), entity, LABEL_DELETE, "Deleted labels: 
" + deletedLabels);
 
-            auditRepository.putEventsV2(events);
+            auditRepository.putEventsV2(events.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
@@ -376,33 +367,38 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
         if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
             MetricRecorder metric = 
RequestContext.get().startMetricRecord("entityAudit");
 
-            List<EntityAuditEventV2> auditEvents = new ArrayList<>();
+            FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
 
             for (Map.Entry<String, Map<String, Object>> entry : 
updatedBusinessAttributes.entrySet()) {
                 String              bmName     = entry.getKey();
                 Map<String, Object> attributes = entry.getValue();
                 String              details    = AtlasJson.toJson(new 
AtlasStruct(bmName, attributes));
-                EntityAuditEventV2  auditEvent = createEvent(entity, 
BUSINESS_ATTRIBUTE_UPDATE, "Updated business attributes: " + details);
 
-                auditEvents.add(auditEvent);
+                createEvent(events.next(), entity, BUSINESS_ATTRIBUTE_UPDATE, 
"Updated business attributes: " + details);
             }
 
-            auditRepository.putEventsV2(auditEvents);
+            auditRepository.putEventsV2(events.toList());
 
             RequestContext.get().endMetricRecord(metric);
         }
     }
 
 
-    private EntityAuditEventV2 createEvent(AtlasEntity entity, 
EntityAuditActionV2 action, String details) {
-        return new EntityAuditEventV2(entity.getGuid(), 
RequestContext.get().getRequestTime(),
-                                      RequestContext.get().getUser(), action, 
details, entity);
+    private EntityAuditEventV2 createEvent(EntityAuditEventV2 
entityAuditEventV2, AtlasEntity entity, EntityAuditActionV2 action, String 
details) {
+        entityAuditEventV2.setEntityId(entity.getGuid());
+        entityAuditEventV2.setTimestamp(RequestContext.get().getRequestTime());
+        entityAuditEventV2.setUser(RequestContext.get().getUser());
+        entityAuditEventV2.setAction(action);
+        entityAuditEventV2.setDetails(details);
+        entityAuditEventV2.setEntity(entity);
+
+        return entityAuditEventV2;
     }
 
-    private EntityAuditEventV2 createEvent(AtlasEntity entity, 
EntityAuditActionV2 action) {
+    private EntityAuditEventV2 createEvent(EntityAuditEventV2 event, 
AtlasEntity entity, EntityAuditActionV2 action) {
         String detail = getAuditEventDetail(entity, action);
 
-        return createEvent(entity, action, detail);
+        return createEvent(event, entity, action, detail);
     }
 
     private String getAuditEventDetail(AtlasEntity entity, EntityAuditActionV2 
action) {
@@ -619,4 +615,11 @@ public class EntityAuditListenerV2 implements 
EntityChangeListenerV2 {
 
         return ret;
     }
+
+    private FixedBufferList<EntityAuditEventV2> getAuditEventsList() {
+        FixedBufferList<EntityAuditEventV2> ret = AUDIT_EVENTS_BUFFER.get();
+        ret.reset();
+        return ret;
+
+    }
 }

Reply via email to