This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 97f6f7ca3da96c22174dd573a6b2a3cc1db51b62 Author: Sarath Subramanian <ssubraman...@hortonworks.com> AuthorDate: Wed Apr 17 16:20:38 2019 -0700 ATLAS-3148: Implement Hive Metastore hook for Atlas (cherry picked from commit 0174bac0b3fc6547441fbb6a8e64a8e8ae990050) --- .../apache/atlas/hive/hook/HiveMetastoreHook.java | 199 +++++++++++++++++++++ .../atlas/hive/hook/AtlasHiveHookContext.java | 134 +++++++++++--- .../java/org/apache/atlas/hive/hook/HiveHook.java | 21 ++- .../atlas/hive/hook/HiveMetastoreHookImpl.java | 193 ++++++++++++++++++++ .../atlas/hive/hook/HiveOperationContext.java | 72 ++++++++ .../atlas/hive/hook/events/AlterDatabase.java | 34 +++- .../apache/atlas/hive/hook/events/AlterTable.java | 4 +- .../atlas/hive/hook/events/AlterTableRename.java | 52 ++++-- .../hive/hook/events/AlterTableRenameCol.java | 88 ++++++--- .../atlas/hive/hook/events/BaseHiveEvent.java | 127 +++++++++---- .../atlas/hive/hook/events/CreateDatabase.java | 31 +++- .../atlas/hive/hook/events/CreateHiveProcess.java | 15 +- .../apache/atlas/hive/hook/events/CreateTable.java | 83 ++++++--- .../atlas/hive/hook/events/DropDatabase.java | 35 +++- .../apache/atlas/hive/hook/events/DropTable.java | 30 +++- 15 files changed, 957 insertions(+), 161 deletions(-) diff --git a/addons/hive-bridge-shim/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHook.java b/addons/hive-bridge-shim/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHook.java new file mode 100644 index 0000000..2894e99 --- /dev/null +++ b/addons/hive-bridge-shim/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHook.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.hook; + +import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.events.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Hive Metastore hook to capture DDL operations for atlas entity registration. + */ +public class HiveMetastoreHook extends MetaStoreEventListener { + private static final String ATLAS_PLUGIN_TYPE = "hive"; + private static final String ATLAS_HIVE_METASTORE_HOOK_IMPL_CLASSNAME = "org.apache.atlas.hive.hook.HiveMetastoreHookImpl"; + public static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreHook.class); + + private AtlasPluginClassLoader atlasPluginClassLoader = null; + private MetaStoreEventListener atlasMetastoreHookImpl = null; + private Configuration config; + + public HiveMetastoreHook(Configuration config) { + super(config); + + this.config = config; + + this.initialize(); + } + + private void initialize() { + if (LOG.isDebugEnabled()) { + LOG.debug("==> HiveMetastoreHook.initialize()"); + } + + try { + atlasPluginClassLoader = AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE, this.getClass()); + + @SuppressWarnings("unchecked") + Class<MetaStoreEventListener> cls = (Class<MetaStoreEventListener>) + Class.forName(ATLAS_HIVE_METASTORE_HOOK_IMPL_CLASSNAME, true, atlasPluginClassLoader); + + activatePluginClassLoader(); + + atlasMetastoreHookImpl = cls.getDeclaredConstructor(Configuration.class).newInstance(config); + } catch (Exception ex) { + LOG.error("Error instantiating Atlas hook implementation", ex); + } finally { + deactivatePluginClassLoader(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HiveMetastoreHook.initialize()"); + } + } + + @Override + public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> HiveMetastoreHook.onCreateTable()"); + } + + try { + activatePluginClassLoader(); + + atlasMetastoreHookImpl.onCreateTable(tableEvent); + } finally { + deactivatePluginClassLoader(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HiveMetastoreHook.onCreateTable()"); + } + } + + @Override + public void onDropTable(DropTableEvent tableEvent) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> HiveMetastoreHook.onDropTable()"); + } + + try { + activatePluginClassLoader(); + + atlasMetastoreHookImpl.onDropTable(tableEvent); + } finally { + deactivatePluginClassLoader(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HiveMetastoreHook.onDropTable()"); + } + } + + @Override + public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> HiveMetastoreHook.onAlterTable()"); + } + + try { + activatePluginClassLoader(); + + atlasMetastoreHookImpl.onAlterTable(tableEvent); + } finally { + deactivatePluginClassLoader(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HiveMetastoreHook.onAlterTable()"); + } + } + + @Override + public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> HiveMetastoreHook.onCreateDatabase()"); + } + + try { + activatePluginClassLoader(); + + atlasMetastoreHookImpl.onCreateDatabase(dbEvent); + } finally { + deactivatePluginClassLoader(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HiveMetastoreHook.onCreateDatabase()"); + } + } + + @Override + public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> HiveMetastoreHook.onDropDatabase()"); + } + + try { + activatePluginClassLoader(); + + atlasMetastoreHookImpl.onDropDatabase(dbEvent); + } finally { + deactivatePluginClassLoader(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HiveMetastoreHook.onDropDatabase()"); + } + } + + @Override + public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> HiveMetastoreHook.onAlterDatabase()"); + } + + try { + activatePluginClassLoader(); + + atlasMetastoreHookImpl.onAlterDatabase(dbEvent); + } finally { + deactivatePluginClassLoader(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== HiveMetastoreHook.onAlterDatabase()"); + } + } + + private void activatePluginClassLoader() { + if (atlasPluginClassLoader != null) { + atlasPluginClassLoader.activate(); + } + } + + private void deactivatePluginClassLoader() { + if (atlasPluginClassLoader != null) { + atlasPluginClassLoader.deactivate(); + } + } +} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java index 44c6437..52da710 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -19,21 +19,25 @@ package org.apache.atlas.hive.hook; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.hive.hook.HiveMetastoreHookImpl.HiveMetastoreHook; import org.apache.atlas.hive.hook.HiveHook.PreprocessAction; import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.events.*; import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.hooks.LineageInfo; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.SessionState; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.*; + +import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable; public class AtlasHiveHookContext { @@ -41,6 +45,8 @@ public class AtlasHiveHookContext { public static final char QNAME_SEP_ENTITY_NAME = '.'; public static final char QNAME_SEP_PROCESS = ':'; public static final String TEMP_TABLE_PREFIX = "_temp-"; + public static final String CREATE_OPERATION = "CREATE"; + public static final String ALTER_OPERATION = "ALTER"; private final HiveHook hook; private final HiveOperation hiveOperation; @@ -48,17 +54,58 @@ public class AtlasHiveHookContext { private final Hive hive; private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>(); private final HiveHookObjectNamesCache knownObjects; + private final HiveMetastoreHook metastoreHook; + private final ListenerEvent metastoreEvent; + private final IHMSHandler metastoreHandler; + + public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, + HiveHookObjectNamesCache knownObjects) throws Exception { + this(hook, hiveOperation, hiveContext, knownObjects, null, null); + } - public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, HiveHookObjectNamesCache knownObjects) throws Exception { - this.hook = hook; - this.hiveOperation = hiveOperation; - this.hiveContext = hiveContext; - this.hive = Hive.get(hiveContext.getConf()); - this.knownObjects = knownObjects; + public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HiveHookObjectNamesCache knownObjects, + HiveMetastoreHook metastoreHook, ListenerEvent listenerEvent) throws Exception { + this(hook, hiveOperation, null, knownObjects, metastoreHook, listenerEvent); + } + + public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, HiveHookObjectNamesCache knownObjects, + HiveMetastoreHook metastoreHook, ListenerEvent listenerEvent) throws Exception { + this.hook = hook; + this.hiveOperation = hiveOperation; + this.hiveContext = hiveContext; + this.hive = hiveContext != null ? Hive.get(hiveContext.getConf()) : null; + this.knownObjects = knownObjects; + this.metastoreHook = metastoreHook; + this.metastoreEvent = listenerEvent; + this.metastoreHandler = (listenerEvent != null) ? metastoreEvent.getIHMSHandler() : null; init(); } + public boolean isMetastoreHook() { + return metastoreHook != null; + } + + public ListenerEvent getMetastoreEvent() { + return metastoreEvent; + } + + public IHMSHandler getMetastoreHandler() { + return metastoreHandler; + } + + public Set<ReadEntity> getInputs() { + return hiveContext != null ? hiveContext.getInputs() : Collections.emptySet(); + } + + public Set<WriteEntity> getOutputs() { + return hiveContext != null ? hiveContext.getOutputs() : Collections.emptySet(); + } + + public LineageInfo getLineageInfo() { + return hiveContext != null ? hiveContext.getLinfo() : null; + } + public HookContext getHiveContext() { return hiveContext; } @@ -147,24 +194,59 @@ public class AtlasHiveHookContext { } private void init() { - if (knownObjects != null) { - String operationName = hiveContext.getOperationName(); - - if (operationName != null && operationName.startsWith("CREATE") || operationName.startsWith("ALTER")) { - if (CollectionUtils.isNotEmpty(hiveContext.getOutputs())) { - for (WriteEntity output : hiveContext.getOutputs()) { - switch (output.getType()) { - case DATABASE: - knownObjects.removeFromKnownDatabase(getQualifiedName(output.getDatabase())); - break; - - case TABLE: - knownObjects.removeFromKnownTable(getQualifiedName(output.getTable())); - break; - } + String operation = hiveOperation.getOperationName(); + + if (knownObjects == null || !isCreateAlterOperation(operation)) { + return; + } + + List<Database> databases = new ArrayList<>(); + List<Table> tables = new ArrayList<>(); + + if (isMetastoreHook()) { + switch (hiveOperation) { + case CREATEDATABASE: + databases.add(((CreateDatabaseEvent) metastoreEvent).getDatabase()); + break; + case ALTERDATABASE: + databases.add(((AlterDatabaseEvent) metastoreEvent).getOldDatabase()); + databases.add(((AlterDatabaseEvent) metastoreEvent).getNewDatabase()); + break; + case CREATETABLE: + tables.add(toTable(((CreateTableEvent) metastoreEvent).getTable())); + break; + case ALTERTABLE_PROPERTIES: + case ALTERTABLE_RENAME: + case ALTERTABLE_RENAMECOL: + tables.add(toTable(((AlterTableEvent) metastoreEvent).getOldTable())); + tables.add(toTable(((AlterTableEvent) metastoreEvent).getNewTable())); + break; + } + } else { + if (getOutputs() != null) { + for (WriteEntity output : hiveContext.getOutputs()) { + switch (output.getType()) { + case DATABASE: + databases.add(output.getDatabase()); + break; + case TABLE: + tables.add(output.getTable()); + break; } } } } + + for (Database database : databases) { + knownObjects.removeFromKnownDatabase(getQualifiedName(database)); + } + + for (Table table : tables) { + knownObjects.removeFromKnownTable(getQualifiedName(table)); + } + } + + private static boolean isCreateAlterOperation(String operationName) { + return operationName != null && operationName.startsWith(CREATE_OPERATION) || operationName.startsWith(ALTER_OPERATION); } -} +} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index ee02285..6109297 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -146,17 +146,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { LOG.debug("==> HiveHook.run({})", hookContext.getOperationName()); } - if (knownObjects != null && knownObjects.isCacheExpired()) { - LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount()); - - knownObjects = new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds); - } - try { HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName()); - AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, knownObjects); - - BaseHiveEvent event = null; + AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects()); + BaseHiveEvent event = null; switch (oper) { case CREATEDATABASE: @@ -169,6 +162,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { case ALTERDATABASE: case ALTERDATABASE_OWNER: + case ALTERDATABASE_LOCATION: event = new AlterDatabase(context); break; @@ -288,6 +282,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return ret; } + public static HiveHookObjectNamesCache getKnownObjects() { + if (knownObjects != null && knownObjects.isCacheExpired()) { + LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount()); + + knownObjects = new HiveHook.HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds); + } + + return knownObjects; + } public static class HiveHookObjectNamesCache { private final int dbMaxCacheCount; diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java new file mode 100644 index 0000000..251fddd --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.hook; + +import org.apache.atlas.hive.hook.events.*; +import org.apache.atlas.hook.AtlasHook; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.events.*; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.apache.atlas.hive.hook.events.AlterTableRenameCol.findRenamedColumn; +import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.*; + +public class HiveMetastoreHookImpl extends MetaStoreEventListener { + private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreHookImpl.class); + private final HiveHook hiveHook; + private final HiveMetastoreHook hook; + + public HiveMetastoreHookImpl(Configuration config) { + super(config); + + this.hiveHook = new HiveHook(); + this.hook = new HiveMetastoreHook(); + } + + @Override + public void onCreateDatabase(CreateDatabaseEvent dbEvent) { + HiveOperationContext context = new HiveOperationContext(CREATEDATABASE, dbEvent); + + hook.handleEvent(context); + } + + @Override + public void onDropDatabase(DropDatabaseEvent dbEvent) { + HiveOperationContext context = new HiveOperationContext(DROPDATABASE, dbEvent); + + hook.handleEvent(context); + } + + @Override + public void onAlterDatabase(AlterDatabaseEvent dbEvent) { + HiveOperationContext context = new HiveOperationContext(ALTERDATABASE, dbEvent); + + hook.handleEvent(context); + } + + @Override + public void onCreateTable(CreateTableEvent tableEvent) { + HiveOperationContext context = new HiveOperationContext(CREATETABLE, tableEvent); + + hook.handleEvent(context); + } + + @Override + public void onDropTable(DropTableEvent tableEvent) { + HiveOperationContext context = new HiveOperationContext(DROPTABLE, tableEvent); + + hook.handleEvent(context); + } + + @Override + public void onAlterTable(AlterTableEvent tableEvent) { + HiveOperationContext context = new HiveOperationContext(tableEvent); + Table oldTable = toTable(tableEvent.getOldTable()); + Table newTable = toTable(tableEvent.getNewTable()); + + if (isTableRename(oldTable, newTable)) { + context.setOperation(ALTERTABLE_RENAME); + } else if (isColumnRename(oldTable, newTable, context)) { + context.setOperation(ALTERTABLE_RENAMECOL); + } else { + context.setOperation(ALTERTABLE_PROPERTIES); // map other alter table operations to ALTERTABLE_PROPERTIES + } + + hook.handleEvent(context); + } + + public class HiveMetastoreHook extends AtlasHook { + public HiveMetastoreHook() { + } + + public void handleEvent(HiveOperationContext operContext) { + ListenerEvent listenerEvent = operContext.getEvent(); + + if (!listenerEvent.getStatus()) { + return; + } + + try { + HiveOperation oper = operContext.getOperation(); + AtlasHiveHookContext context = new AtlasHiveHookContext(hiveHook, oper, hiveHook.getKnownObjects(), this, listenerEvent); + BaseHiveEvent event = null; + + switch (oper) { + case CREATEDATABASE: + event = new CreateDatabase(context); + break; + + case DROPDATABASE: + event = new DropDatabase(context); + break; + + case ALTERDATABASE: + event = new AlterDatabase(context); + break; + + case CREATETABLE: + event = new CreateTable(context, true); + break; + + case DROPTABLE: + event = new DropTable(context); + break; + + case ALTERTABLE_PROPERTIES: + event = new AlterTable(context); + break; + + case ALTERTABLE_RENAME: + event = new AlterTableRename(context); + break; + + case ALTERTABLE_RENAMECOL: + FieldSchema columnOld = operContext.getColumnOld(); + FieldSchema columnNew = operContext.getColumnNew(); + + event = new AlterTableRenameCol(columnOld, columnNew, context); + break; + + default: + if (LOG.isDebugEnabled()) { + LOG.debug("HiveMetastoreHook.handleEvent({}): operation ignored.", listenerEvent); + } + break; + } + + if (event != null) { + final UserGroupInformation ugi = SecurityUtils.getUGI() == null ? Utils.getUGI() : SecurityUtils.getUGI(); + + super.notifyEntities(event.getNotificationMessages(), ugi); + } + } catch (Throwable t) { + LOG.error("HiveMetastoreHook.handleEvent({}): failed to process operation {}", listenerEvent, t); + } + } + } + + private static boolean isTableRename(Table oldTable, Table newTable) { + String oldTableName = oldTable.getTableName(); + String newTableName = newTable.getTableName(); + + return !StringUtils.equalsIgnoreCase(oldTableName, newTableName); + } + + private static boolean isColumnRename(Table oldTable, Table newTable, HiveOperationContext context) { + FieldSchema columnOld = findRenamedColumn(oldTable, newTable); + FieldSchema columnNew = findRenamedColumn(newTable, oldTable); + boolean isColumnRename = columnOld != null && columnNew != null; + + if (isColumnRename) { + context.setColumnOld(columnOld); + context.setColumnNew(columnNew); + } + + return isColumnRename; + } +} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveOperationContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveOperationContext.java new file mode 100644 index 0000000..23ea4be --- /dev/null +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveOperationContext.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.hive.hook; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hadoop.hive.ql.plan.HiveOperation; + +public class HiveOperationContext { + HiveOperation operation; + ListenerEvent event; + FieldSchema columnOld; + FieldSchema columnNew; + + public HiveOperationContext(ListenerEvent event) { + this(null, event); + } + + public HiveOperationContext(HiveOperation operation, ListenerEvent event) { + setOperation(operation); + setEvent(event); + setColumnOld(null); + setColumnNew(null); + } + + public ListenerEvent getEvent() { + return event; + } + + public void setEvent(ListenerEvent event) { + this.event = event; + } + + public HiveOperation getOperation() { + return operation; + } + + public void setOperation(HiveOperation operation) { + this.operation = operation; + } + + public FieldSchema getColumnOld() { + return columnOld; + } + + public void setColumnOld(FieldSchema columnOld) { + this.columnOld = columnOld; + } + + public FieldSchema getColumnNew() { + return columnNew; + } + + public void setColumnNew(FieldSchema columnNew) { + this.columnNew = columnNew; + } +} diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterDatabase.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterDatabase.java index 6808574..6b01c4e 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterDatabase.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterDatabase.java @@ -19,15 +19,22 @@ package org.apache.atlas.hive.hook.events; import org.apache.atlas.hive.hook.AtlasHiveHookContext; +import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2; import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; public class AlterDatabase extends CreateDatabase { + private static final Logger LOG = LoggerFactory.getLogger(AlterDatabase.class); + public AlterDatabase(AtlasHiveHookContext context) { super(context); } @@ -35,7 +42,7 @@ public class AlterDatabase extends CreateDatabase { @Override public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - AtlasEntitiesWithExtInfo entities = getEntities(); + AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) { ret = Collections.singletonList(new EntityUpdateRequestV2(getUserName(), entities)); @@ -43,4 +50,27 @@ public class AlterDatabase extends CreateDatabase { return ret; } -} + + public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception { + AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); + AlterDatabaseEvent dbEvent = (AlterDatabaseEvent) context.getMetastoreEvent(); + Database oldDb = dbEvent.getOldDatabase(); + Database newDb = dbEvent.getNewDatabase(); + + if (newDb != null) { + AtlasEntity dbEntity = toDbEntity(newDb); + + ret.addEntity(dbEntity); + } else { + LOG.error("AlterDatabase.getEntities(): failed to retrieve db"); + } + + addProcessedEntities(ret); + + return ret; + } + + public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception { + return super.getHiveEntities(); + } +} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTable.java index adad81a..e164370 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTable.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTable.java @@ -35,7 +35,7 @@ public class AlterTable extends CreateTable { @Override public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - AtlasEntitiesWithExtInfo entities = getEntities(); + AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) { ret = Collections.singletonList(new EntityUpdateRequestV2(getUserName(), entities)); @@ -43,4 +43,4 @@ public class AlterTable extends CreateTable { return ret; } -} +} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java index 7e15d0e..d3d8349 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java @@ -29,6 +29,7 @@ import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateR import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Table; @@ -49,24 +50,48 @@ public class AlterTableRename extends BaseHiveEvent { @Override public List<HookNotification> getNotificationMessages() throws Exception { + return context.isMetastoreHook() ? getHiveMetastoreMessages() : getHiveMessages(); + } + + public List<HookNotification> getHiveMetastoreMessages() throws Exception { + List<HookNotification> ret = new ArrayList<>(); + AlterTableEvent tblEvent = (AlterTableEvent) context.getMetastoreEvent(); + Table oldTable = toTable(tblEvent.getOldTable()); + Table newTable = toTable(tblEvent.getNewTable()); + + if (newTable == null) { + LOG.error("AlterTableRename: renamed table not found in outputs list"); + + return ret; + } + + processTables(oldTable, newTable, ret); + + return ret; + } + + public List<HookNotification> getHiveMessages() throws Exception { List<HookNotification> ret = new ArrayList<>(); + Table oldTable; + Table newTable; - if (CollectionUtils.isEmpty(getHiveContext().getInputs())) { + if (CollectionUtils.isEmpty(getInputs())) { LOG.error("AlterTableRename: old-table not found in inputs list"); return ret; } - Table oldTable = getHiveContext().getInputs().iterator().next().getTable(); - Table newTable = null; + oldTable = getInputs().iterator().next().getTable(); + newTable = null; - if (CollectionUtils.isNotEmpty(getHiveContext().getOutputs())) { - for (WriteEntity entity : getHiveContext().getOutputs()) { + if (CollectionUtils.isNotEmpty(getOutputs())) { + for (WriteEntity entity : getOutputs()) { if (entity.getType() == Entity.Type.TABLE) { newTable = entity.getTable(); //Hive sends with both old and new table names in the outputs which is weird. So skipping that with the below check - if (StringUtils.equalsIgnoreCase(newTable.getDbName(), oldTable.getDbName()) && StringUtils.equalsIgnoreCase(newTable.getTableName(), oldTable.getTableName())) { + if (StringUtils.equalsIgnoreCase(newTable.getDbName(), oldTable.getDbName()) && + StringUtils.equalsIgnoreCase(newTable.getTableName(), oldTable.getTableName())) { newTable = null; continue; @@ -85,11 +110,17 @@ public class AlterTableRename extends BaseHiveEvent { return ret; } + processTables(oldTable, newTable, ret); + + return ret; + } + + private void processTables(Table oldTable, Table newTable, List<HookNotification> ret) throws Exception { AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable); AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable); if (oldTableEntity == null || renamedTableEntity == null) { - return ret; + return; } // first update with oldTable info, so that the table will be created if it is not present in Atlas @@ -110,14 +141,13 @@ public class AlterTableRename extends BaseHiveEvent { // set previous name as the alias renamedTableEntity.getEntity().setAttribute(ATTRIBUTE_ALIASES, Collections.singletonList(oldTable.getTableName())); - AtlasObjectId oldTableId = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + String oldTableQualifiedName = (String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME); + AtlasObjectId oldTableId = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableQualifiedName); // update qualifiedName and other attributes (like params - which include lastModifiedTime, lastModifiedBy) of the table ret.add(new EntityPartialUpdateRequestV2(getUserName(), oldTableId, renamedTableEntity)); - context.removeFromKnownTable((String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME)); - - return ret; + context.removeFromKnownTable(oldTableQualifiedName); } private void renameColumns(List<AtlasObjectId> columns, AtlasEntityExtInfo oldEntityExtInfo, String newTableQualifiedName, List<HookNotification> notifications) { diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRenameCol.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRenameCol.java index 5bbdd81..756a608 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRenameCol.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRenameCol.java @@ -26,6 +26,7 @@ import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.ql.metadata.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,64 +35,79 @@ import java.util.ArrayList; import java.util.List; public class AlterTableRenameCol extends AlterTable { - private static final Logger LOG = LoggerFactory.getLogger(AlterTableRenameCol.class); + private static final Logger LOG = LoggerFactory.getLogger(AlterTableRenameCol.class); + private final FieldSchema columnOld; + private final FieldSchema columnNew; public AlterTableRenameCol(AtlasHiveHookContext context) { + this(null, null, context); + } + + public AlterTableRenameCol(FieldSchema columnOld, FieldSchema columnNew, AtlasHiveHookContext context) { super(context); + + this.columnOld = columnOld; + this.columnNew = columnNew; } @Override public List<HookNotification> getNotificationMessages() throws Exception { - if (CollectionUtils.isEmpty(getHiveContext().getInputs())) { + return context.isMetastoreHook() ? getHiveMetastoreMessages() : getHiveMessages(); + } + + public List<HookNotification> getHiveMetastoreMessages() throws Exception { + List<HookNotification> baseMsgs = super.getNotificationMessages(); + List<HookNotification> ret = new ArrayList<>(baseMsgs); + AlterTableEvent tblEvent = (AlterTableEvent) context.getMetastoreEvent(); + Table oldTable = toTable(tblEvent.getOldTable()); + Table newTable = toTable(tblEvent.getNewTable()); + + processColumns(oldTable, newTable, ret); + + return ret; + } + + public List<HookNotification> getHiveMessages() throws Exception { + List<HookNotification> baseMsgs = super.getNotificationMessages(); + List<HookNotification> ret = new ArrayList<>(baseMsgs); + + if (CollectionUtils.isEmpty(getInputs())) { LOG.error("AlterTableRenameCol: old-table not found in inputs list"); return null; } - if (CollectionUtils.isEmpty(getHiveContext().getOutputs())) { + if (CollectionUtils.isEmpty(getOutputs())) { LOG.error("AlterTableRenameCol: new-table not found in outputs list"); return null; } - List<HookNotification> baseMsgs = super.getNotificationMessages(); - if (CollectionUtils.isEmpty(baseMsgs)) { LOG.debug("Skipped processing of column-rename (on a temporary table?)"); return null; } - List<HookNotification> ret = new ArrayList<>(baseMsgs); - Table oldTable = getHiveContext().getInputs().iterator().next().getTable(); - Table newTable = getHiveContext().getOutputs().iterator().next().getTable(); - - newTable = getHive().getTable(newTable.getDbName(), newTable.getTableName()); - - List<FieldSchema> oldColumns = oldTable.getCols(); - List<FieldSchema> newColumns = newTable.getCols(); - FieldSchema changedColumnOld = null; - FieldSchema changedColumnNew = null; - - for (FieldSchema oldColumn : oldColumns) { - if (!newColumns.contains(oldColumn)) { - changedColumnOld = oldColumn; + Table oldTable = getInputs().iterator().next().getTable(); + Table newTable = getOutputs().iterator().next().getTable(); - break; - } + if (newTable != null) { + newTable = getHive().getTable(newTable.getDbName(), newTable.getTableName()); } - for (FieldSchema newColumn : newColumns) { - if (!oldColumns.contains(newColumn)) { - changedColumnNew = newColumn; + processColumns(oldTable, newTable, ret); - break; - } - } + return ret; + } + + private void processColumns(Table oldTable, Table newTable, List<HookNotification> ret) { + FieldSchema changedColumnOld = (columnOld == null) ? findRenamedColumn(oldTable, newTable) : columnOld; + FieldSchema changedColumnNew = (columnNew == null) ? findRenamedColumn(newTable, oldTable) : columnNew; if (changedColumnOld != null && changedColumnNew != null) { AtlasObjectId oldColumnId = new AtlasObjectId(HIVE_TYPE_COLUMN, ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(oldTable, changedColumnOld)); - AtlasEntity newColumn = new AtlasEntity(HIVE_TYPE_COLUMN); + AtlasEntity newColumn = new AtlasEntity(HIVE_TYPE_COLUMN); newColumn.setAttribute(ATTRIBUTE_NAME, changedColumnNew.getName()); newColumn.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(newTable, changedColumnNew)); @@ -100,7 +116,21 @@ public class AlterTableRenameCol extends AlterTable { } else { LOG.error("AlterTableRenameCol: no renamed column detected"); } + } + + public static FieldSchema findRenamedColumn(Table inputTable, Table outputTable) { + FieldSchema ret = null; + List<FieldSchema> inputColumns = inputTable.getCols(); + List<FieldSchema> outputColumns = outputTable.getCols(); + + for (FieldSchema inputColumn : inputColumns) { + if (!outputColumns.contains(inputColumn)) { + ret = inputColumn; + + break; + } + } return ret; } -} +} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index 31346d0..4127c61 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -37,11 +37,10 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.ql.hooks.Entity; -import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.hive.ql.hooks.*; import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyKey; -import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.HiveOperation; @@ -328,7 +327,8 @@ public abstract class BaseHiveEvent { } protected AtlasEntity toTableEntity(Table table, AtlasEntityExtInfo entityExtInfo) throws Exception { - AtlasEntity dbEntity = toDbEntity(getHive().getDatabase(table.getDbName())); + Database db = getDatabases(table.getDbName()); + AtlasEntity dbEntity = toDbEntity(db); if (entityExtInfo != null) { if (dbEntity != null) { @@ -594,8 +594,7 @@ public abstract class BaseHiveEvent { protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception { AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS); - HookContext hookContext = getHiveContext(); - String queryStr = hookContext.getQueryPlan().getQueryStr(); + String queryStr = getQueryString(); if (queryStr != null) { queryStr = queryStr.toLowerCase().trim(); @@ -605,12 +604,12 @@ public abstract class BaseHiveEvent { ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs)); ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs)); ret.setAttribute(ATTRIBUTE_NAME, queryStr); - ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, hookContext.getOperationName()); - ret.setAttribute(ATTRIBUTE_START_TIME, hookContext.getQueryPlan().getQueryStartTime()); + ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName()); + ret.setAttribute(ATTRIBUTE_START_TIME, getQueryStartTime()); ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis()); ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName()); ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr); - ret.setAttribute(ATTRIBUTE_QUERY_ID, hookContext.getQueryPlan().getQuery().getQueryId()); + ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId()); ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported"); ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr)); @@ -621,34 +620,86 @@ public abstract class BaseHiveEvent { return context.getClusterName(); } + protected Database getDatabases(String dbName) throws Exception { + return context.isMetastoreHook() ? context.getMetastoreHandler().get_database(dbName) : + context.getHive().getDatabase(dbName); + } + protected Hive getHive() { return context.getHive(); } - protected HookContext getHiveContext() { - return context.getHiveContext(); + protected Set<ReadEntity> getInputs() { + return context != null ? context.getInputs() : Collections.emptySet(); } - protected String getUserName() { - String ret = getHiveContext().getUserName(); + protected Set<WriteEntity> getOutputs() { + return context != null ? context.getOutputs() : Collections.emptySet(); + } - if (StringUtils.isEmpty(ret)) { - UserGroupInformation ugi = getHiveContext().getUgi(); + protected LineageInfo getLineageInfo() { + return context != null ? context.getLineageInfo() : null; + } + + protected String getQueryString() { + return isHiveContextValid() ? context.getHiveContext().getQueryPlan().getQueryStr() : null; + } + + protected String getOperationName() { + return isHiveContextValid() ? context.getHiveContext().getOperationName() : null; + } + + protected String getHiveUserName() { + return isHiveContextValid() ? context.getHiveContext().getUserName() : null; + } + + protected UserGroupInformation getUgi() { + return isHiveContextValid() ? context.getHiveContext().getUgi() : null; + } + + protected Long getQueryStartTime() { + return isHiveContextValid() ? context.getHiveContext().getQueryPlan().getQueryStartTime() : null; + } + + protected String getQueryId() { + return isHiveContextValid() ? context.getHiveContext().getQueryPlan().getQueryId() : null; + } + + private boolean isHiveContextValid() { + return context != null && context.getHiveContext() != null; + } + + protected String getUserName() { + String ret = null; + UserGroupInformation ugi = null; - if (ugi != null) { - ret = ugi.getShortUserName(); + if (context.isMetastoreHook()) { + try { + ugi = SecurityUtils.getUGI(); + } catch (Exception e) { + //do nothing } + } else { + ret = getHiveUserName(); if (StringUtils.isEmpty(ret)) { - try { - ret = UserGroupInformation.getCurrentUser().getShortUserName(); - } catch (IOException e) { - LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e); - ret = System.getProperty("user.name"); - } + ugi = getUgi(); } } + if (ugi != null) { + ret = ugi.getShortUserName(); + } + + if (StringUtils.isEmpty(ret)) { + try { + ret = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e); + + ret = System.getProperty("user.name"); + } + } return ret; } @@ -757,7 +808,7 @@ public abstract class BaseHiveEvent { operation == HiveOperation.CREATEVIEW || operation == HiveOperation.ALTERVIEW_AS || operation == HiveOperation.ALTERTABLE_LOCATION) { - List<? extends Entity> sortedEntities = new ArrayList<>(getHiveContext().getOutputs()); + List<? extends Entity> sortedEntities = new ArrayList<>(getOutputs()); Collections.sort(sortedEntities, entityComparator); @@ -774,15 +825,23 @@ public abstract class BaseHiveEvent { } } - StringBuilder sb = new StringBuilder(getHiveContext().getOperationName()); + String qualifiedName = null; + String operationName = getOperationName(); - boolean ignoreHDFSPaths = ignoreHDFSPathsinProcessQualifiedName(); + if (operationName != null) { + StringBuilder sb = new StringBuilder(operationName); - addToProcessQualifiedName(sb, getHiveContext().getInputs(), ignoreHDFSPaths); - sb.append("->"); - addToProcessQualifiedName(sb, getHiveContext().getOutputs(), ignoreHDFSPaths); + boolean ignoreHDFSPaths = ignoreHDFSPathsinProcessQualifiedName(); - return sb.toString(); + addToProcessQualifiedName(sb, getInputs(), ignoreHDFSPaths); + sb.append("->"); + addToProcessQualifiedName(sb, getOutputs(), ignoreHDFSPaths); + + qualifiedName = sb.toString(); + } + + + return qualifiedName; } protected AtlasEntity toReferencedHBaseTable(Table table, AtlasEntitiesWithExtInfo entities) { @@ -836,9 +895,9 @@ public abstract class BaseHiveEvent { switch (context.getHiveOperation()) { case LOAD: case IMPORT: - return hasPartitionEntity(getHiveContext().getOutputs()); + return hasPartitionEntity(getOutputs()); case EXPORT: - return hasPartitionEntity(getHiveContext().getInputs()); + return hasPartitionEntity(getInputs()); case QUERY: return true; } @@ -1006,4 +1065,8 @@ public abstract class BaseHiveEvent { return hbaseTableName; } } + + public static Table toTable(org.apache.hadoop.hive.metastore.api.Table table) { + return new Table(table); + } } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java index d017dbe..b01f61f 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java @@ -25,6 +25,7 @@ import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; import org.apache.hadoop.hive.ql.hooks.Entity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,8 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; +import static org.apache.hadoop.hive.ql.hooks.Entity.Type.DATABASE; + public class CreateDatabase extends BaseHiveEvent { private static final Logger LOG = LoggerFactory.getLogger(CreateDatabase.class); @@ -42,7 +45,7 @@ public class CreateDatabase extends BaseHiveEvent { @Override public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - AtlasEntitiesWithExtInfo entities = getEntities(); + AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) { ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities)); @@ -51,11 +54,29 @@ public class CreateDatabase extends BaseHiveEvent { return ret; } - public AtlasEntitiesWithExtInfo getEntities() throws Exception { + public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception { + AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); + CreateDatabaseEvent dbEvent = (CreateDatabaseEvent) context.getMetastoreEvent(); + Database db = dbEvent.getDatabase(); + + if (db != null) { + AtlasEntity dbEntity = toDbEntity(db); + + ret.addEntity(dbEntity); + } else { + LOG.error("CreateDatabase.getEntities(): failed to retrieve db"); + } + + addProcessedEntities(ret); + + return ret; + } + + public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception { AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); - for (Entity entity : getHiveContext().getOutputs()) { - if (entity.getType() == Entity.Type.DATABASE) { + for (Entity entity : getOutputs()) { + if (entity.getType() == DATABASE) { Database db = entity.getDatabase(); if (db != null) { @@ -76,4 +97,4 @@ public class CreateDatabase extends BaseHiveEvent { return ret; } -} +} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index d61f1d7..019deb7 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -72,13 +72,12 @@ public class CreateHiveProcess extends BaseHiveEvent { if (!skipProcess()) { List<AtlasEntity> inputs = new ArrayList<>(); List<AtlasEntity> outputs = new ArrayList<>(); - HookContext hiveContext = getHiveContext(); Set<String> processedNames = new HashSet<>(); ret = new AtlasEntitiesWithExtInfo(); - if (hiveContext.getInputs() != null) { - for (ReadEntity input : hiveContext.getInputs()) { + if (getInputs() != null) { + for (ReadEntity input : getInputs()) { String qualifiedName = getQualifiedName(input); if (qualifiedName == null || !processedNames.add(qualifiedName)) { @@ -97,8 +96,8 @@ public class CreateHiveProcess extends BaseHiveEvent { } } - if (hiveContext.getOutputs() != null) { - for (WriteEntity output : hiveContext.getOutputs()) { + if (getOutputs() != null) { + for (WriteEntity output : getOutputs()) { String qualifiedName = getQualifiedName(output); if (qualifiedName == null || !processedNames.add(qualifiedName)) { @@ -130,7 +129,7 @@ public class CreateHiveProcess extends BaseHiveEvent { } private void processColumnLineage(AtlasEntity hiveProcess, AtlasEntitiesWithExtInfo entities) { - LineageInfo lineageInfo = getHiveContext().getLinfo(); + LineageInfo lineageInfo = getLineageInfo(); if (lineageInfo == null || CollectionUtils.isEmpty(lineageInfo.entrySet())) { return; @@ -235,8 +234,8 @@ public class CreateHiveProcess extends BaseHiveEvent { private boolean skipProcess() { - Set<ReadEntity> inputs = getHiveContext().getInputs(); - Set<WriteEntity> outputs = getHiveContext().getOutputs(); + Set<ReadEntity> inputs = getInputs(); + Set<WriteEntity> outputs = getOutputs(); boolean ret = CollectionUtils.isEmpty(inputs) && CollectionUtils.isEmpty(outputs); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java index 674a89f..b6ec5c3 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java @@ -24,14 +24,19 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; import org.apache.commons.collections.CollectionUtils; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.events.AlterTableEvent; +import org.apache.hadoop.hive.metastore.events.CreateTableEvent; +import org.apache.hadoop.hive.metastore.events.ListenerEvent; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.HiveOperation; import java.util.Collections; import java.util.List; +import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.*; + public class CreateTable extends BaseHiveEvent { private final boolean skipTempTables; @@ -44,7 +49,7 @@ public class CreateTable extends BaseHiveEvent { @Override public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - AtlasEntitiesWithExtInfo entities = getEntities(); + AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) { ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities)); @@ -53,31 +58,62 @@ public class CreateTable extends BaseHiveEvent { return ret; } - public AtlasEntitiesWithExtInfo getEntities() throws Exception { + public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception { AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); - Database db = null; - Table table = null; + ListenerEvent event = context.getMetastoreEvent(); + HiveOperation oper = context.getHiveOperation(); + Table table; + + if (isAlterTable(oper)) { + table = toTable(((AlterTableEvent) event).getNewTable()); + } else { + table = toTable(((CreateTableEvent) event).getTable()); + } + + if (skipTemporaryTable(table)) { + table = null; + } + + processTable(table, ret); - for (Entity entity : getHiveContext().getOutputs()) { - if (entity.getType() == Entity.Type.TABLE) { - table = entity.getTable(); + addProcessedEntities(ret); + + return ret; + } + + public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception { + AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); + Table table = null; - if (table != null) { - db = getHive().getDatabase(table.getDbName()); - table = getHive().getTable(table.getDbName(), table.getTableName()); + if (CollectionUtils.isNotEmpty(getOutputs())) { + for (Entity entity : getOutputs()) { + if (entity.getType() == Entity.Type.TABLE) { + table = entity.getTable(); if (table != null) { - // If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage. - if (skipTempTables && table.isTemporary() && !TableType.EXTERNAL_TABLE.equals(table.getTableType())) { - table = null; - } else { - break; + table = getHive().getTable(table.getDbName(), table.getTableName()); + + if (table != null) { + if (skipTemporaryTable(table)) { + table = null; + } else { + break; + } } } } } } + processTable(table, ret); + + addProcessedEntities(ret); + + return ret; + } + + // create process entities for lineages from HBase/HDFS to hive table + private void processTable(Table table, AtlasEntitiesWithExtInfo ret) throws Exception { if (table != null) { AtlasEntity tblEntity = toTableEntity(table, ret); @@ -89,7 +125,7 @@ public class CreateTable extends BaseHiveEvent { if (hbaseTableEntity != null) { final AtlasEntity processEntity; - if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + if (EXTERNAL_TABLE.equals(table.getTableType())) { processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity)); } else { processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity)); @@ -98,7 +134,7 @@ public class CreateTable extends BaseHiveEvent { ret.addEntity(processEntity); } } else { - if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + if (EXTERNAL_TABLE.equals(table.getTableType())) { AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret); AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity)); @@ -108,9 +144,14 @@ public class CreateTable extends BaseHiveEvent { } } } + } - addProcessedEntities(ret); + private static boolean isAlterTable(HiveOperation oper) { + return (oper == ALTERTABLE_PROPERTIES || oper == ALTERTABLE_RENAME || oper == ALTERTABLE_RENAMECOL); + } - return ret; + private boolean skipTemporaryTable(Table table) { + // If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage. + return table != null && skipTempTables && table.isTemporary() && !EXTERNAL_TABLE.equals(table.getTableType()); } } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java index 1795bf2..20019d2 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java @@ -23,21 +23,25 @@ import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2; import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.ql.hooks.Entity; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.apache.hadoop.hive.ql.hooks.Entity.Type.DATABASE; +import static org.apache.hadoop.hive.ql.hooks.Entity.Type.TABLE; + public class DropDatabase extends BaseHiveEvent { public DropDatabase(AtlasHiveHookContext context) { super(context); } @Override - public List<HookNotification> getNotificationMessages() throws Exception { + public List<HookNotification> getNotificationMessages() { List<HookNotification> ret = null; - List<AtlasObjectId> entities = getEntities(); + List<AtlasObjectId> entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); if (CollectionUtils.isNotEmpty(entities)) { ret = new ArrayList<>(entities.size()); @@ -50,27 +54,40 @@ public class DropDatabase extends BaseHiveEvent { return ret; } - public List<AtlasObjectId> getEntities() throws Exception { + private List<AtlasObjectId> getHiveMetastoreEntities() { + List<AtlasObjectId> ret = new ArrayList<>(); + DropDatabaseEvent dbEvent = (DropDatabaseEvent) context.getMetastoreEvent(); + String dbQName = getQualifiedName(dbEvent.getDatabase()); + AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_DB, ATTRIBUTE_QUALIFIED_NAME, dbQName); + + context.removeFromKnownDatabase(dbQName); + + ret.add(dbId); + + return ret; + } + + private List<AtlasObjectId> getHiveEntities() { List<AtlasObjectId> ret = new ArrayList<>(); - for (Entity entity : getHiveContext().getOutputs()) { - if (entity.getType() == Entity.Type.DATABASE) { + for (Entity entity : getOutputs()) { + if (entity.getType() == DATABASE) { String dbQName = getQualifiedName(entity.getDatabase()); AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_DB, ATTRIBUTE_QUALIFIED_NAME, dbQName); context.removeFromKnownDatabase(dbQName); ret.add(dbId); - } else if (entity.getType() == Entity.Type.TABLE) { + } else if (entity.getType() == TABLE) { String tblQName = getQualifiedName(entity.getTable()); - AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); + AtlasObjectId tblId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); context.removeFromKnownTable(tblQName); - ret.add(dbId); + ret.add(tblId); } } return ret; } -} +} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java index a0f7d8a..440c08a 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java @@ -23,7 +23,9 @@ import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2; import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.metadata.Table; import java.util.ArrayList; import java.util.Collections; @@ -35,9 +37,9 @@ public class DropTable extends BaseHiveEvent { } @Override - public List<HookNotification> getNotificationMessages() throws Exception { + public List<HookNotification> getNotificationMessages() { List<HookNotification> ret = null; - List<AtlasObjectId> entities = getEntities(); + List<AtlasObjectId> entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); if (CollectionUtils.isNotEmpty(entities)) { ret = new ArrayList<>(entities.size()); @@ -50,20 +52,34 @@ public class DropTable extends BaseHiveEvent { return ret; } - public List<AtlasObjectId> getEntities() throws Exception { + public List<AtlasObjectId> getHiveMetastoreEntities() { + List<AtlasObjectId> ret = new ArrayList<>(); + DropTableEvent tblEvent = (DropTableEvent) context.getMetastoreEvent(); + Table table = new Table(tblEvent.getTable()); + String tblQName = getQualifiedName(table); + AtlasObjectId tblId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); + + context.removeFromKnownTable(tblQName); + + ret.add(tblId); + + return ret; + } + + public List<AtlasObjectId> getHiveEntities() { List<AtlasObjectId> ret = new ArrayList<>(); - for (Entity entity : getHiveContext().getOutputs()) { + for (Entity entity : getOutputs()) { if (entity.getType() == Entity.Type.TABLE) { String tblQName = getQualifiedName(entity.getTable()); - AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); + AtlasObjectId tblId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); context.removeFromKnownTable(tblQName); - ret.add(dbId); + ret.add(tblId); } } return ret; } -} +} \ No newline at end of file