This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 4a57f92 ATLAS-3878: Notifications: Improve Memory Usage for HBase
Audits Writing
4a57f92 is described below
commit 4a57f92f649e389b8f5ef653394a5f091f1a36de
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Fri Jul 31 08:54:04 2020 -0700
ATLAS-3878: Notifications: Improve Memory Usage for HBase Audits Writing
---
.../java/org/apache/atlas/AtlasConfiguration.java | 1 +
.../java/org/apache/atlas/model/Clearable.java | 22 +++
.../atlas/model/audit/EntityAuditEventV2.java | 16 ++-
.../org/apache/atlas/utils/FixedBufferList.java | 87 ++++++++++++
.../apache/atlas/utils/FixedBufferListTest.java | 146 +++++++++++++++++++
.../repository/audit/EntityAuditListenerV2.java | 157 +++++++++++----------
6 files changed, 351 insertions(+), 78 deletions(-)
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index a942b9f..1c79158 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -47,6 +47,7 @@ public enum AtlasConfiguration {
NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled",
true),
NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds",
15 * 60),
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS("atlas.notification.split.message.buffer.purge.interval.seconds",
5 * 60),
+
NOTIFICATION_FIXED_BUFFER_ITEMS_INCREMENT_COUNT("atlas.notification.fixed.buffer.items.increment.count",
10),
NOTIFICATION_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.notification.consumer.create.shell.entity.for.non-existing.ref",
true),
REST_API_CREATE_SHELL_ENTITY_FOR_NON_EXISTING_REF("atlas.rest.create.shell.entity.for.non-existing.ref",
false),
diff --git a/intg/src/main/java/org/apache/atlas/model/Clearable.java
b/intg/src/main/java/org/apache/atlas/model/Clearable.java
new file mode 100644
index 0000000..5f9f426
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/model/Clearable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model;
+
+public interface Clearable {
+ void clear();
+}
diff --git
a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
index 63116d4..c37e282 100644
--- a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
+++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.Clearable;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.type.AtlasType;
@@ -44,7 +45,7 @@ import static
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditType.EN
@JsonIgnoreProperties(ignoreUnknown=true)
@XmlRootElement
@XmlAccessorType(XmlAccessType.PROPERTY)
-public class EntityAuditEventV2 implements Serializable {
+public class EntityAuditEventV2 implements Serializable, Clearable {
public enum EntityAuditType { ENTITY_AUDIT_V1, ENTITY_AUDIT_V2 }
public enum EntityAuditActionV2 {
@@ -255,6 +256,19 @@ public class EntityAuditEventV2 implements Serializable {
return ret;
}
+ @JsonIgnore
+ @Override
+ public void clear() {
+ entityId = null;
+ timestamp = 0L;
+ user = null;
+ action = null;
+ details = null;
+ eventKey = null;
+ entity = null;
+ type = null;
+ }
+
private String getJsonPartFromDetails() {
String ret = null;
if(StringUtils.isNotEmpty(details)) {
diff --git a/intg/src/main/java/org/apache/atlas/utils/FixedBufferList.java
b/intg/src/main/java/org/apache/atlas/utils/FixedBufferList.java
new file mode 100644
index 0000000..cf6ec02
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/utils/FixedBufferList.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.Clearable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FixedBufferList<T extends Clearable> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FixedBufferList.class);
+
+ private final Class<T> itemClass;
+ private final ArrayList<T> buffer;
+ private final int incrementCapacityBy;
+
+ private int length;
+
+ public FixedBufferList(Class<T> clazz) {
+ this(clazz, 1);
+ }
+
+ public FixedBufferList(Class<T> clazz, int incrementCapacityBy) {
+ this.incrementCapacityBy = (incrementCapacityBy <= 0 ? 1 :
incrementCapacityBy);
+ this.itemClass = clazz;
+ this.buffer = new ArrayList<>();
+ }
+
+ public T next() {
+ request(length + 1);
+ return buffer.get(length++);
+ }
+
+ public List<T> toList() {
+ return this.buffer.subList(0, this.length);
+ }
+
+ public void reset() {
+ for (int i = 0; i < buffer.size(); i++) {
+ buffer.get(i).clear();
+ }
+
+ this.length = 0;
+ }
+
+ private void request(int requestedCapacity) {
+ if (requestedCapacity <= this.buffer.size()) {
+ return;
+ }
+
+ int oldCapacity = this.buffer.size();
+ int newCapacity = oldCapacity + this.incrementCapacityBy;
+ this.buffer.ensureCapacity(newCapacity);
+ instantiateItems(oldCapacity, newCapacity);
+
+ LOG.info("FixedBufferList: Requested: {} From: {} To:{}",
requestedCapacity, oldCapacity, newCapacity);
+ }
+
+ private void instantiateItems(int startIndex, int maxSize) {
+ for (int i = startIndex; i < maxSize; i++) {
+ try {
+ this.buffer.add(itemClass.newInstance());
+ } catch (InstantiationException e) {
+ LOG.error("FixedBufferList: InstantiationException:
Instantiation failed!", e);
+ } catch (IllegalAccessException e) {
+ LOG.error("FixedBufferList: IllegalAccessException:
Instantiation failed!", e);
+ }
+ }
+ }
+}
diff --git a/intg/src/test/java/org/apache/atlas/utils/FixedBufferListTest.java
b/intg/src/test/java/org/apache/atlas/utils/FixedBufferListTest.java
new file mode 100644
index 0000000..19bfe49
--- /dev/null
+++ b/intg/src/test/java/org/apache/atlas/utils/FixedBufferListTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.utils;
+
+import org.apache.atlas.model.Clearable;
+import org.apache.commons.lang.StringUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class FixedBufferListTest {
+ private String STR_PREFIX = "str:%s";
+
+ public static class Spying implements Clearable {
+ public static AtomicInteger callsToCtor = new AtomicInteger();
+ public static AtomicInteger callsToClear = new AtomicInteger();
+
+ private int anInt;
+ private String aString;
+ private long aLong;
+
+ public Spying() {
+ callsToCtor.incrementAndGet();
+ }
+
+ @Override
+ public void clear() {
+ callsToClear.incrementAndGet();
+
+ anInt = 0;
+ aString = StringUtils.EMPTY;
+ aLong = 0;
+ }
+
+ public static void resetCounters() {
+ Spying.callsToCtor.set(0);
+ Spying.callsToClear.set(0);
+ }
+ }
+
+ private static class SpyingFixedBufferList extends FixedBufferList<Spying>
{
+ public SpyingFixedBufferList(int incrementCapacityFactor) {
+ super(Spying.class, incrementCapacityFactor);
+ }
+ }
+
+ @Test
+ public void instantiateListWithParameterizedClass() {
+ FixedBufferList<Spying> list = new FixedBufferList<>(Spying.class);
+ assertNotNull(list);
+ }
+
+ @Test
+ public void createdBasedOnInitialSize() {
+ Spying.resetCounters();
+
+ int incrementByFactor = 2;
+ SpyingFixedBufferList fixedBufferList = new
SpyingFixedBufferList(incrementByFactor);
+ addElements(fixedBufferList, 0, 3);
+
+ List<Spying> list = fixedBufferList.toList();
+ assertSpyingList(list, 3);
+ assertEquals(Spying.callsToCtor.get(), incrementByFactor * 2);
+ }
+
+ @Test (dependsOnMethods = "createdBasedOnInitialSize")
+ public void bufferIncreasesIfNeeded() {
+ Spying.resetCounters();
+
+ int incrementSizeBy = 5;
+ SpyingFixedBufferList fixedBufferList = new
SpyingFixedBufferList(incrementSizeBy);
+ addElements(fixedBufferList, 0, incrementSizeBy);
+ List<Spying> spyings = fixedBufferList.toList();
+ assertEquals(spyings.size(), incrementSizeBy);
+ assertEquals(Spying.callsToCtor.get(), incrementSizeBy);
+
+ fixedBufferList.reset();
+ addElements(fixedBufferList, 0, incrementSizeBy * 2);
+ spyings = fixedBufferList.toList();
+ assertEquals(Spying.callsToCtor.get(), incrementSizeBy * 2);
+ assertSpyingList(spyings, incrementSizeBy * 2);
+ assertEquals(Spying.callsToClear.get(), incrementSizeBy);
+ }
+
+ @Test
+ public void retrieveEmptyList() {
+ int size = 5;
+ SpyingFixedBufferList fixedBufferList = new
SpyingFixedBufferList(size);
+
+ List<Spying> list = fixedBufferList.toList();
+ assertEquals(list.size(), 0);
+
+ addElements(fixedBufferList, 0, 3);
+ list = fixedBufferList.toList();
+ assertEquals(list.size(), 3);
+ }
+
+ private void assertSpyingList(List<Spying> list, int expectedSize) {
+ assertEquals(list.size(), expectedSize);
+ for (int i1 = 0; i1 < list.size(); i1++) {
+ Assert.assertNotNull(list.get(i1));
+ assertSpying(list.get(i1), i1);
+ }
+ }
+
+ private void assertSpying(Spying spying, int i) {
+ assertEquals(spying.aLong, i);
+ assertEquals(spying.anInt, i);
+ assertEquals(spying.aString, String.format(STR_PREFIX, i));
+ }
+
+ private Spying createSpyingClass(Spying spying, int i) {
+ spying.aLong = i;
+ spying.anInt = i;
+ spying.aString = String.format(STR_PREFIX, i);
+
+ return spying;
+ }
+
+ private void addElements(SpyingFixedBufferList fixedBufferList, int
startIndex, int numElements) {
+ for (int i = startIndex; i < (startIndex + numElements); i++) {
+ Spying spyForUpdate = fixedBufferList.next();
+ createSpyingClass(spyForUpdate, i);
+ }
+ }
+}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
index 79527ac..ca6f373 100644
---
a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java
@@ -17,6 +17,7 @@
*/
package org.apache.atlas.repository.audit;
+import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.EntityAuditEvent.EntityAuditAction;
import org.apache.atlas.RequestContext;
import org.apache.atlas.model.audit.EntityAuditEventV2;
@@ -36,6 +37,7 @@ import org.apache.atlas.type.AtlasType;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasJson;
import org.apache.atlas.utils.AtlasPerfMetrics.MetricRecorder;
+import org.apache.atlas.utils.FixedBufferList;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -45,8 +47,6 @@ import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -74,6 +74,9 @@ import static
org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditActionV
@Component
public class EntityAuditListenerV2 implements EntityChangeListenerV2 {
private static final Logger LOG =
LoggerFactory.getLogger(EntityAuditListenerV2.class);
+ private static final ThreadLocal<FixedBufferList<EntityAuditEventV2>>
AUDIT_EVENTS_BUFFER =
+ ThreadLocal.withInitial(() -> new
FixedBufferList<>(EntityAuditEventV2.class,
+
AtlasConfiguration.NOTIFICATION_FIXED_BUFFER_ITEMS_INCREMENT_COUNT.getInt()));
private final EntityAuditRepository auditRepository;
private final AtlasTypeRegistry typeRegistry;
@@ -90,15 +93,12 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport)
throws AtlasBaseException {
MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
- List<EntityAuditEventV2> events = new ArrayList<>();
-
+ FixedBufferList<EntityAuditEventV2> entitiesAdded =
getAuditEventsList();
for (AtlasEntity entity : entities) {
- EntityAuditEventV2 event = createEvent(entity, isImport ?
ENTITY_IMPORT_CREATE : ENTITY_CREATE);
-
- events.add(event);
+ createEvent(entitiesAdded.next(), entity, isImport ?
ENTITY_IMPORT_CREATE : ENTITY_CREATE);
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(entitiesAdded.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -107,15 +107,12 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
public void onEntitiesUpdated(List<AtlasEntity> entities, boolean
isImport) throws AtlasBaseException {
MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
- List<EntityAuditEventV2> events = new ArrayList<>();
-
+ FixedBufferList<EntityAuditEventV2> updatedEvents =
getAuditEventsList();
for (AtlasEntity entity : entities) {
- EntityAuditEventV2 event = createEvent(entity, isImport ?
ENTITY_IMPORT_UPDATE : ENTITY_UPDATE);
-
- events.add(event);
+ createEvent(updatedEvents.next(), entity, isImport ?
ENTITY_IMPORT_UPDATE : ENTITY_UPDATE);
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(updatedEvents.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -124,15 +121,12 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
public void onEntitiesDeleted(List<AtlasEntity> entities, boolean
isImport) throws AtlasBaseException {
MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
- List<EntityAuditEventV2> events = new ArrayList<>();
-
+ FixedBufferList<EntityAuditEventV2> deletedEntities =
getAuditEventsList();
for (AtlasEntity entity : entities) {
- EntityAuditEventV2 event = createEvent(entity, isImport ?
ENTITY_IMPORT_DELETE : ENTITY_DELETE, "Deleted entity");
-
- events.add(event);
+ createEvent(deletedEntities.next(), entity, isImport ?
ENTITY_IMPORT_DELETE : ENTITY_DELETE, "Deleted entity");
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(deletedEntities.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -141,14 +135,12 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
public void onEntitiesPurged(List<AtlasEntity> entities) throws
AtlasBaseException {
MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
- List<EntityAuditEventV2> events = new ArrayList<>();
-
+ FixedBufferList<EntityAuditEventV2> eventsPurged =
getAuditEventsList();
for (AtlasEntity entity : entities) {
- EntityAuditEventV2 event = createEvent(entity, ENTITY_PURGE);
- events.add(event);
+ createEvent(eventsPurged.next(), entity, ENTITY_PURGE);
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(eventsPurged.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -158,17 +150,16 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
- List<EntityAuditEventV2> events = new ArrayList<>();
-
+ FixedBufferList<EntityAuditEventV2> classificationsAdded =
getAuditEventsList();
for (AtlasClassification classification : classifications) {
if (entity.getGuid().equals(classification.getEntityGuid())) {
- events.add(createEvent(entity, CLASSIFICATION_ADD, "Added
classification: " + AtlasType.toJson(classification)));
+ createEvent(classificationsAdded.next(), entity,
CLASSIFICATION_ADD, "Added classification: " +
AtlasType.toJson(classification));
} else {
- events.add(createEvent(entity,
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " +
AtlasType.toJson(classification)));
+ createEvent(classificationsAdded.next(), entity,
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " +
AtlasType.toJson(classification));
}
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(classificationsAdded.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -177,20 +168,20 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
@Override
public void onClassificationsAdded(List<AtlasEntity> entities,
List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
- MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
- List<EntityAuditEventV2> events = Collections.synchronizedList(new
ArrayList<>());
+ MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
+ FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasClassification classification : classifications) {
for (AtlasEntity entity : entities) {
if
(entity.getGuid().equals(classification.getEntityGuid())) {
- events.add(createEvent(entity, CLASSIFICATION_ADD,
"Added classification: " + AtlasType.toJson(classification)));
+ createEvent(events.next(), entity, CLASSIFICATION_ADD,
"Added classification: " + AtlasType.toJson(classification));
} else {
- events.add(createEvent(entity,
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " +
AtlasType.toJson(classification)));
+ createEvent(events.next(), entity,
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " +
AtlasType.toJson(classification));
}
}
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -201,24 +192,24 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
if (CollectionUtils.isNotEmpty(classifications)) {
MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
- List<EntityAuditEventV2> events = new ArrayList<>();
- String guid = entity.getGuid();
+ FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
+ String guid = entity.getGuid();
for (AtlasClassification classification : classifications) {
if (guid.equals(classification.getEntityGuid())) {
- events.add(createEvent(entity, CLASSIFICATION_UPDATE,
"Updated classification: " + AtlasType.toJson(classification)));
+ createEvent(events.next(), entity, CLASSIFICATION_UPDATE,
"Updated classification: " + AtlasType.toJson(classification));
} else {
if (isPropagatedClassificationAdded(guid, classification))
{
- events.add(createEvent(entity,
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " +
AtlasType.toJson(classification)));
+ createEvent(events.next(), entity,
PROPAGATED_CLASSIFICATION_ADD, "Added propagated classification: " +
AtlasType.toJson(classification));
} else if (isPropagatedClassificationDeleted(guid,
classification)) {
- events.add(createEvent(entity,
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " +
classification.getTypeName()));
+ createEvent(events.next(), entity,
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " +
classification.getTypeName());
} else {
- events.add(createEvent(entity,
PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " +
AtlasType.toJson(classification)));
+ createEvent(events.next(), entity,
PROPAGATED_CLASSIFICATION_UPDATE, "Updated propagated classification: " +
AtlasType.toJson(classification));
}
}
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -227,19 +218,19 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
@Override
public void onClassificationsDeleted(AtlasEntity entity,
List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications)) {
- MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
+ MetricRecorder metric =
RequestContext.get().startMetricRecord("onClassificationsDeleted");
- List<EntityAuditEventV2> events = new ArrayList<>();
+ FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasClassification classification : classifications) {
if (StringUtils.equals(entity.getGuid(),
classification.getEntityGuid())) {
- events.add(createEvent(entity, CLASSIFICATION_DELETE,
"Deleted classification: " + classification.getTypeName()));
+ createEvent(events.next(), entity, CLASSIFICATION_DELETE,
"Deleted classification: " + classification.getTypeName());
} else {
- events.add(createEvent(entity,
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " +
classification.getTypeName()));
+ createEvent(events.next(), entity,
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " +
classification.getTypeName());
}
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -248,20 +239,20 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
@Override
public void onClassificationsDeleted(List<AtlasEntity> entities,
List<AtlasClassification> classifications) throws AtlasBaseException {
if (CollectionUtils.isNotEmpty(classifications) &&
CollectionUtils.isNotEmpty(entities)) {
- MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
- List<EntityAuditEventV2> events = Collections.synchronizedList(new
ArrayList<>());
+ MetricRecorder metric =
RequestContext.get().startMetricRecord("onClassificationsDeleted");
+ FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasClassification classification : classifications) {
for (AtlasEntity entity : entities) {
if (StringUtils.equals(entity.getGuid(),
classification.getEntityGuid())) {
- events.add(createEvent(entity, CLASSIFICATION_DELETE,
"Deleted classification: " + classification.getTypeName()));
+ createEvent(events.next(), entity,
CLASSIFICATION_DELETE, "Deleted classification: " +
classification.getTypeName());
} else {
- events.add(createEvent(entity,
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " +
classification.getTypeName()));
+ createEvent(events.next(), entity,
PROPAGATED_CLASSIFICATION_DELETE, "Deleted propagated classification: " +
classification.getTypeName());
}
}
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -270,19 +261,19 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
@Override
public void onTermAdded(AtlasGlossaryTerm term, List<AtlasRelatedObjectId>
entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
- MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
+ MetricRecorder metric =
RequestContext.get().startMetricRecord("onTermAdded");
- List<EntityAuditEventV2> events = new ArrayList<>();
+ FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasRelatedObjectId relatedObjectId : entities) {
AtlasEntity entity =
instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
if (entity != null) {
- events.add(createEvent(entity, TERM_ADD, "Added term: " +
term.toAuditString()));
+ createEvent(events.next(), entity, TERM_ADD, "Added term:
" + term.toAuditString());
}
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -291,19 +282,19 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
@Override
public void onTermDeleted(AtlasGlossaryTerm term,
List<AtlasRelatedObjectId> entities) throws AtlasBaseException {
if (term != null && CollectionUtils.isNotEmpty(entities)) {
- MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
+ MetricRecorder metric =
RequestContext.get().startMetricRecord("onTermDeleted");
- List<EntityAuditEventV2> events = new ArrayList<>();
+ FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (AtlasRelatedObjectId relatedObjectId : entities) {
AtlasEntity entity =
instanceConverter.getAndCacheEntity(relatedObjectId.getGuid());
if (entity != null) {
- events.add(createEvent(entity, TERM_DELETE, "Deleted term:
" + term.toAuditString()));
+ createEvent(events.next(), entity, TERM_DELETE, "Deleted
term: " + term.toAuditString());
}
}
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -312,15 +303,15 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
@Override
public void onLabelsAdded(AtlasEntity entity, Set<String> labels) throws
AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) {
- MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
+ MetricRecorder metric =
RequestContext.get().startMetricRecord("onLabelsAdded");
- List<EntityAuditEventV2> events = new ArrayList<>();
+ FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
String addedLabels = StringUtils.join(labels, " ");
- events.add(createEvent(entity, LABEL_ADD, "Added labels: " +
addedLabels));
+ createEvent(events.next(), entity, LABEL_ADD, "Added labels: " +
addedLabels);
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -329,15 +320,15 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
@Override
public void onLabelsDeleted(AtlasEntity entity, Set<String> labels) throws
AtlasBaseException {
if (CollectionUtils.isNotEmpty(labels)) {
- MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
+ MetricRecorder metric =
RequestContext.get().startMetricRecord("onLabelsDeleted");
- List<EntityAuditEventV2> events = new ArrayList<>();
+ FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
String deletedLabels = StringUtils.join(labels, " ");
- events.add(createEvent(entity, LABEL_DELETE, "Deleted labels: " +
deletedLabels));
+ createEvent(events.next(), entity, LABEL_DELETE, "Deleted labels:
" + deletedLabels);
- auditRepository.putEventsV2(events);
+ auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
@@ -376,33 +367,38 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
if (MapUtils.isNotEmpty(updatedBusinessAttributes)) {
MetricRecorder metric =
RequestContext.get().startMetricRecord("entityAudit");
- List<EntityAuditEventV2> auditEvents = new ArrayList<>();
+ FixedBufferList<EntityAuditEventV2> events = getAuditEventsList();
for (Map.Entry<String, Map<String, Object>> entry :
updatedBusinessAttributes.entrySet()) {
String bmName = entry.getKey();
Map<String, Object> attributes = entry.getValue();
String details = AtlasJson.toJson(new
AtlasStruct(bmName, attributes));
- EntityAuditEventV2 auditEvent = createEvent(entity,
BUSINESS_ATTRIBUTE_UPDATE, "Updated business attributes: " + details);
- auditEvents.add(auditEvent);
+ createEvent(events.next(), entity, BUSINESS_ATTRIBUTE_UPDATE,
"Updated business attributes: " + details);
}
- auditRepository.putEventsV2(auditEvents);
+ auditRepository.putEventsV2(events.toList());
RequestContext.get().endMetricRecord(metric);
}
}
- private EntityAuditEventV2 createEvent(AtlasEntity entity,
EntityAuditActionV2 action, String details) {
- return new EntityAuditEventV2(entity.getGuid(),
RequestContext.get().getRequestTime(),
- RequestContext.get().getUser(), action,
details, entity);
+ private EntityAuditEventV2 createEvent(EntityAuditEventV2
entityAuditEventV2, AtlasEntity entity, EntityAuditActionV2 action, String
details) {
+ entityAuditEventV2.setEntityId(entity.getGuid());
+ entityAuditEventV2.setTimestamp(RequestContext.get().getRequestTime());
+ entityAuditEventV2.setUser(RequestContext.get().getUser());
+ entityAuditEventV2.setAction(action);
+ entityAuditEventV2.setDetails(details);
+ entityAuditEventV2.setEntity(entity);
+
+ return entityAuditEventV2;
}
- private EntityAuditEventV2 createEvent(AtlasEntity entity,
EntityAuditActionV2 action) {
+ private EntityAuditEventV2 createEvent(EntityAuditEventV2 event,
AtlasEntity entity, EntityAuditActionV2 action) {
String detail = getAuditEventDetail(entity, action);
- return createEvent(entity, action, detail);
+ return createEvent(event, entity, action, detail);
}
private String getAuditEventDetail(AtlasEntity entity, EntityAuditActionV2
action) {
@@ -619,4 +615,11 @@ public class EntityAuditListenerV2 implements
EntityChangeListenerV2 {
return ret;
}
+
+ private FixedBufferList<EntityAuditEventV2> getAuditEventsList() {
+ FixedBufferList<EntityAuditEventV2> ret = AUDIT_EVENTS_BUFFER.get();
+ ret.reset();
+ return ret;
+
+ }
}