Repository: atlas Updated Branches: refs/heads/master 2ed39cce3 -> 943eb2bb0
ATLAS-2198: fix for Hive Hook OOM for large notification messages Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/943eb2bb Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/943eb2bb Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/943eb2bb Branch: refs/heads/master Commit: 943eb2bb01d8a2b7c59eef49ea7559b176102087 Parents: 2ed39cc Author: apoorvnaik <[email protected]> Authored: Tue Oct 10 15:26:29 2017 -0700 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Oct 10 16:43:16 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/hive/hook/HiveHook.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/943eb2bb/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index aca5645..f815773 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -59,6 +59,7 @@ import java.net.MalformedURLException; import java.net.URI; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.Date; import java.util.HashMap; @@ -698,7 +699,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } colLineageProcessInstances.add(0, processReferenceable); entities.addAll(colLineageProcessInstances); - event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<>(entities))); + + addEntityUpdateNotificationMessagess(event, entities); } else { LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr()); } @@ -711,6 +713,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } } + private void addEntityUpdateNotificationMessagess(final HiveEventContext event, final Collection<Referenceable> entities) { + // process each entity as separate message to avoid running into OOM errors + for (Referenceable entity : entities) { + event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entity)); + } + } + private <T extends Entity> void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, T entity, Set<String> dataSetsProcessed, SortedMap<T, Referenceable> dataSets, Set<Referenceable> entities) throws AtlasHookException { try { @@ -801,7 +810,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { entities.addAll(tables.values()); entities.add(processReferenceable); - event.addMessage(new HookNotification.EntityUpdateRequest(event.getUser(), entities)); + + addEntityUpdateNotificationMessagess(event, entities); } }
