This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 5263a6b Revert "ATLAS-3148: Implement Hive Metastore hook for Atlas" - change not included in branch-2.0 5263a6b is described below commit 5263a6bc3ae05665949870f9e13a5a6733e08e82 Author: Sarath Subramanian <ssubraman...@hortonworks.com> AuthorDate: Wed Apr 17 17:03:13 2019 -0700 Revert "ATLAS-3148: Implement Hive Metastore hook for Atlas" - change not included in branch-2.0 This reverts commit 97f6f7ca3da96c22174dd573a6b2a3cc1db51b62. --- .../apache/atlas/hive/hook/HiveMetastoreHook.java | 199 --------------------- .../atlas/hive/hook/AtlasHiveHookContext.java | 134 +++----------- .../java/org/apache/atlas/hive/hook/HiveHook.java | 21 +-- .../atlas/hive/hook/HiveMetastoreHookImpl.java | 193 -------------------- .../atlas/hive/hook/HiveOperationContext.java | 72 -------- .../atlas/hive/hook/events/AlterDatabase.java | 34 +--- .../apache/atlas/hive/hook/events/AlterTable.java | 4 +- .../atlas/hive/hook/events/AlterTableRename.java | 52 ++---- .../hive/hook/events/AlterTableRenameCol.java | 88 +++------ .../atlas/hive/hook/events/BaseHiveEvent.java | 127 ++++--------- .../atlas/hive/hook/events/CreateDatabase.java | 31 +--- .../atlas/hive/hook/events/CreateHiveProcess.java | 15 +- .../apache/atlas/hive/hook/events/CreateTable.java | 83 +++------ .../atlas/hive/hook/events/DropDatabase.java | 35 +--- .../apache/atlas/hive/hook/events/DropTable.java | 30 +--- 15 files changed, 161 insertions(+), 957 deletions(-) diff --git a/addons/hive-bridge-shim/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHook.java b/addons/hive-bridge-shim/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHook.java deleted file mode 100644 index 2894e99..0000000 --- a/addons/hive-bridge-shim/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHook.java +++ /dev/null @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.hive.hook; - -import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.MetaStoreEventListener; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.events.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Hive Metastore hook to capture DDL operations for atlas entity registration. - */ -public class HiveMetastoreHook extends MetaStoreEventListener { - private static final String ATLAS_PLUGIN_TYPE = "hive"; - private static final String ATLAS_HIVE_METASTORE_HOOK_IMPL_CLASSNAME = "org.apache.atlas.hive.hook.HiveMetastoreHookImpl"; - public static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreHook.class); - - private AtlasPluginClassLoader atlasPluginClassLoader = null; - private MetaStoreEventListener atlasMetastoreHookImpl = null; - private Configuration config; - - public HiveMetastoreHook(Configuration config) { - super(config); - - this.config = config; - - this.initialize(); - } - - private void initialize() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HiveMetastoreHook.initialize()"); - } - - try { - atlasPluginClassLoader = AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE, this.getClass()); - - @SuppressWarnings("unchecked") - Class<MetaStoreEventListener> cls = (Class<MetaStoreEventListener>) - Class.forName(ATLAS_HIVE_METASTORE_HOOK_IMPL_CLASSNAME, true, atlasPluginClassLoader); - - activatePluginClassLoader(); - - atlasMetastoreHookImpl = cls.getDeclaredConstructor(Configuration.class).newInstance(config); - } catch (Exception ex) { - LOG.error("Error instantiating Atlas hook implementation", ex); - } finally { - deactivatePluginClassLoader(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== HiveMetastoreHook.initialize()"); - } - } - - @Override - public void onCreateTable(CreateTableEvent tableEvent) throws MetaException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HiveMetastoreHook.onCreateTable()"); - } - - try { - activatePluginClassLoader(); - - atlasMetastoreHookImpl.onCreateTable(tableEvent); - } finally { - deactivatePluginClassLoader(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== HiveMetastoreHook.onCreateTable()"); - } - } - - @Override - public void onDropTable(DropTableEvent tableEvent) throws MetaException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HiveMetastoreHook.onDropTable()"); - } - - try { - activatePluginClassLoader(); - - atlasMetastoreHookImpl.onDropTable(tableEvent); - } finally { - deactivatePluginClassLoader(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== HiveMetastoreHook.onDropTable()"); - } - } - - @Override - public void onAlterTable(AlterTableEvent tableEvent) throws MetaException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HiveMetastoreHook.onAlterTable()"); - } - - try { - activatePluginClassLoader(); - - atlasMetastoreHookImpl.onAlterTable(tableEvent); - } finally { - deactivatePluginClassLoader(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== HiveMetastoreHook.onAlterTable()"); - } - } - - @Override - public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HiveMetastoreHook.onCreateDatabase()"); - } - - try { - activatePluginClassLoader(); - - atlasMetastoreHookImpl.onCreateDatabase(dbEvent); - } finally { - deactivatePluginClassLoader(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== HiveMetastoreHook.onCreateDatabase()"); - } - } - - @Override - public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HiveMetastoreHook.onDropDatabase()"); - } - - try { - activatePluginClassLoader(); - - atlasMetastoreHookImpl.onDropDatabase(dbEvent); - } finally { - deactivatePluginClassLoader(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== HiveMetastoreHook.onDropDatabase()"); - } - } - - @Override - public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> HiveMetastoreHook.onAlterDatabase()"); - } - - try { - activatePluginClassLoader(); - - atlasMetastoreHookImpl.onAlterDatabase(dbEvent); - } finally { - deactivatePluginClassLoader(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== HiveMetastoreHook.onAlterDatabase()"); - } - } - - private void activatePluginClassLoader() { - if (atlasPluginClassLoader != null) { - atlasPluginClassLoader.activate(); - } - } - - private void deactivatePluginClassLoader() { - if (atlasPluginClassLoader != null) { - atlasPluginClassLoader.deactivate(); - } - } -} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java index 52da710..44c6437 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/AtlasHiveHookContext.java @@ -19,25 +19,21 @@ package org.apache.atlas.hive.hook; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.hive.hook.HiveMetastoreHookImpl.HiveMetastoreHook; import org.apache.atlas.hive.hook.HiveHook.PreprocessAction; import org.apache.atlas.hive.hook.HiveHook.HiveHookObjectNamesCache; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.RandomStringUtils; -import org.apache.hadoop.hive.metastore.IHMSHandler; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.events.*; import org.apache.hadoop.hive.ql.hooks.HookContext; -import org.apache.hadoop.hive.ql.hooks.LineageInfo; -import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.session.SessionState; -import java.util.*; - -import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; public class AtlasHiveHookContext { @@ -45,8 +41,6 @@ public class AtlasHiveHookContext { public static final char QNAME_SEP_ENTITY_NAME = '.'; public static final char QNAME_SEP_PROCESS = ':'; public static final String TEMP_TABLE_PREFIX = "_temp-"; - public static final String CREATE_OPERATION = "CREATE"; - public static final String ALTER_OPERATION = "ALTER"; private final HiveHook hook; private final HiveOperation hiveOperation; @@ -54,58 +48,17 @@ public class AtlasHiveHookContext { private final Hive hive; private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>(); private final HiveHookObjectNamesCache knownObjects; - private final HiveMetastoreHook metastoreHook; - private final ListenerEvent metastoreEvent; - private final IHMSHandler metastoreHandler; - - public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, - HiveHookObjectNamesCache knownObjects) throws Exception { - this(hook, hiveOperation, hiveContext, knownObjects, null, null); - } - public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HiveHookObjectNamesCache knownObjects, - HiveMetastoreHook metastoreHook, ListenerEvent listenerEvent) throws Exception { - this(hook, hiveOperation, null, knownObjects, metastoreHook, listenerEvent); - } - - public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, HiveHookObjectNamesCache knownObjects, - HiveMetastoreHook metastoreHook, ListenerEvent listenerEvent) throws Exception { - this.hook = hook; - this.hiveOperation = hiveOperation; - this.hiveContext = hiveContext; - this.hive = hiveContext != null ? Hive.get(hiveContext.getConf()) : null; - this.knownObjects = knownObjects; - this.metastoreHook = metastoreHook; - this.metastoreEvent = listenerEvent; - this.metastoreHandler = (listenerEvent != null) ? metastoreEvent.getIHMSHandler() : null; + public AtlasHiveHookContext(HiveHook hook, HiveOperation hiveOperation, HookContext hiveContext, HiveHookObjectNamesCache knownObjects) throws Exception { + this.hook = hook; + this.hiveOperation = hiveOperation; + this.hiveContext = hiveContext; + this.hive = Hive.get(hiveContext.getConf()); + this.knownObjects = knownObjects; init(); } - public boolean isMetastoreHook() { - return metastoreHook != null; - } - - public ListenerEvent getMetastoreEvent() { - return metastoreEvent; - } - - public IHMSHandler getMetastoreHandler() { - return metastoreHandler; - } - - public Set<ReadEntity> getInputs() { - return hiveContext != null ? hiveContext.getInputs() : Collections.emptySet(); - } - - public Set<WriteEntity> getOutputs() { - return hiveContext != null ? hiveContext.getOutputs() : Collections.emptySet(); - } - - public LineageInfo getLineageInfo() { - return hiveContext != null ? hiveContext.getLinfo() : null; - } - public HookContext getHiveContext() { return hiveContext; } @@ -194,59 +147,24 @@ public class AtlasHiveHookContext { } private void init() { - String operation = hiveOperation.getOperationName(); - - if (knownObjects == null || !isCreateAlterOperation(operation)) { - return; - } - - List<Database> databases = new ArrayList<>(); - List<Table> tables = new ArrayList<>(); - - if (isMetastoreHook()) { - switch (hiveOperation) { - case CREATEDATABASE: - databases.add(((CreateDatabaseEvent) metastoreEvent).getDatabase()); - break; - case ALTERDATABASE: - databases.add(((AlterDatabaseEvent) metastoreEvent).getOldDatabase()); - databases.add(((AlterDatabaseEvent) metastoreEvent).getNewDatabase()); - break; - case CREATETABLE: - tables.add(toTable(((CreateTableEvent) metastoreEvent).getTable())); - break; - case ALTERTABLE_PROPERTIES: - case ALTERTABLE_RENAME: - case ALTERTABLE_RENAMECOL: - tables.add(toTable(((AlterTableEvent) metastoreEvent).getOldTable())); - tables.add(toTable(((AlterTableEvent) metastoreEvent).getNewTable())); - break; - } - } else { - if (getOutputs() != null) { - for (WriteEntity output : hiveContext.getOutputs()) { - switch (output.getType()) { - case DATABASE: - databases.add(output.getDatabase()); - break; - case TABLE: - tables.add(output.getTable()); - break; + if (knownObjects != null) { + String operationName = hiveContext.getOperationName(); + + if (operationName != null && operationName.startsWith("CREATE") || operationName.startsWith("ALTER")) { + if (CollectionUtils.isNotEmpty(hiveContext.getOutputs())) { + for (WriteEntity output : hiveContext.getOutputs()) { + switch (output.getType()) { + case DATABASE: + knownObjects.removeFromKnownDatabase(getQualifiedName(output.getDatabase())); + break; + + case TABLE: + knownObjects.removeFromKnownTable(getQualifiedName(output.getTable())); + break; + } } } } } - - for (Database database : databases) { - knownObjects.removeFromKnownDatabase(getQualifiedName(database)); - } - - for (Table table : tables) { - knownObjects.removeFromKnownTable(getQualifiedName(table)); - } - } - - private static boolean isCreateAlterOperation(String operationName) { - return operationName != null && operationName.startsWith(CREATE_OPERATION) || operationName.startsWith(ALTER_OPERATION); } -} \ No newline at end of file +} diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 6109297..ee02285 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -146,10 +146,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { LOG.debug("==> HiveHook.run({})", hookContext.getOperationName()); } + if (knownObjects != null && knownObjects.isCacheExpired()) { + LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount()); + + knownObjects = new HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds); + } + try { HiveOperation oper = OPERATION_MAP.get(hookContext.getOperationName()); - AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, getKnownObjects()); - BaseHiveEvent event = null; + AtlasHiveHookContext context = new AtlasHiveHookContext(this, oper, hookContext, knownObjects); + + BaseHiveEvent event = null; switch (oper) { case CREATEDATABASE: @@ -162,7 +169,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { case ALTERDATABASE: case ALTERDATABASE_OWNER: - case ALTERDATABASE_LOCATION: event = new AlterDatabase(context); break; @@ -282,15 +288,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return ret; } - public static HiveHookObjectNamesCache getKnownObjects() { - if (knownObjects != null && knownObjects.isCacheExpired()) { - LOG.info("HiveHook.run(): purging cached databaseNames ({}) and tableNames ({})", knownObjects.getCachedDbCount(), knownObjects.getCachedTableCount()); - - knownObjects = new HiveHook.HiveHookObjectNamesCache(nameCacheDatabaseMaxCount, nameCacheTableMaxCount, nameCacheRebuildIntervalSeconds); - } - - return knownObjects; - } public static class HiveHookObjectNamesCache { private final int dbMaxCacheCount; diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java deleted file mode 100644 index 251fddd..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveMetastoreHookImpl.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.hive.hook; - -import org.apache.atlas.hive.hook.events.*; -import org.apache.atlas.hook.AtlasHook; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.MetaStoreEventListener; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.events.*; -import org.apache.hadoop.hive.metastore.utils.SecurityUtils; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.plan.HiveOperation; -import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -import static org.apache.atlas.hive.hook.events.AlterTableRenameCol.findRenamedColumn; -import static org.apache.atlas.hive.hook.events.BaseHiveEvent.toTable; -import static org.apache.hadoop.hive.ql.plan.HiveOperation.*; - -public class HiveMetastoreHookImpl extends MetaStoreEventListener { - private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreHookImpl.class); - private final HiveHook hiveHook; - private final HiveMetastoreHook hook; - - public HiveMetastoreHookImpl(Configuration config) { - super(config); - - this.hiveHook = new HiveHook(); - this.hook = new HiveMetastoreHook(); - } - - @Override - public void onCreateDatabase(CreateDatabaseEvent dbEvent) { - HiveOperationContext context = new HiveOperationContext(CREATEDATABASE, dbEvent); - - hook.handleEvent(context); - } - - @Override - public void onDropDatabase(DropDatabaseEvent dbEvent) { - HiveOperationContext context = new HiveOperationContext(DROPDATABASE, dbEvent); - - hook.handleEvent(context); - } - - @Override - public void onAlterDatabase(AlterDatabaseEvent dbEvent) { - HiveOperationContext context = new HiveOperationContext(ALTERDATABASE, dbEvent); - - hook.handleEvent(context); - } - - @Override - public void onCreateTable(CreateTableEvent tableEvent) { - HiveOperationContext context = new HiveOperationContext(CREATETABLE, tableEvent); - - hook.handleEvent(context); - } - - @Override - public void onDropTable(DropTableEvent tableEvent) { - HiveOperationContext context = new HiveOperationContext(DROPTABLE, tableEvent); - - hook.handleEvent(context); - } - - @Override - public void onAlterTable(AlterTableEvent tableEvent) { - HiveOperationContext context = new HiveOperationContext(tableEvent); - Table oldTable = toTable(tableEvent.getOldTable()); - Table newTable = toTable(tableEvent.getNewTable()); - - if (isTableRename(oldTable, newTable)) { - context.setOperation(ALTERTABLE_RENAME); - } else if (isColumnRename(oldTable, newTable, context)) { - context.setOperation(ALTERTABLE_RENAMECOL); - } else { - context.setOperation(ALTERTABLE_PROPERTIES); // map other alter table operations to ALTERTABLE_PROPERTIES - } - - hook.handleEvent(context); - } - - public class HiveMetastoreHook extends AtlasHook { - public HiveMetastoreHook() { - } - - public void handleEvent(HiveOperationContext operContext) { - ListenerEvent listenerEvent = operContext.getEvent(); - - if (!listenerEvent.getStatus()) { - return; - } - - try { - HiveOperation oper = operContext.getOperation(); - AtlasHiveHookContext context = new AtlasHiveHookContext(hiveHook, oper, hiveHook.getKnownObjects(), this, listenerEvent); - BaseHiveEvent event = null; - - switch (oper) { - case CREATEDATABASE: - event = new CreateDatabase(context); - break; - - case DROPDATABASE: - event = new DropDatabase(context); - break; - - case ALTERDATABASE: - event = new AlterDatabase(context); - break; - - case CREATETABLE: - event = new CreateTable(context, true); - break; - - case DROPTABLE: - event = new DropTable(context); - break; - - case ALTERTABLE_PROPERTIES: - event = new AlterTable(context); - break; - - case ALTERTABLE_RENAME: - event = new AlterTableRename(context); - break; - - case ALTERTABLE_RENAMECOL: - FieldSchema columnOld = operContext.getColumnOld(); - FieldSchema columnNew = operContext.getColumnNew(); - - event = new AlterTableRenameCol(columnOld, columnNew, context); - break; - - default: - if (LOG.isDebugEnabled()) { - LOG.debug("HiveMetastoreHook.handleEvent({}): operation ignored.", listenerEvent); - } - break; - } - - if (event != null) { - final UserGroupInformation ugi = SecurityUtils.getUGI() == null ? Utils.getUGI() : SecurityUtils.getUGI(); - - super.notifyEntities(event.getNotificationMessages(), ugi); - } - } catch (Throwable t) { - LOG.error("HiveMetastoreHook.handleEvent({}): failed to process operation {}", listenerEvent, t); - } - } - } - - private static boolean isTableRename(Table oldTable, Table newTable) { - String oldTableName = oldTable.getTableName(); - String newTableName = newTable.getTableName(); - - return !StringUtils.equalsIgnoreCase(oldTableName, newTableName); - } - - private static boolean isColumnRename(Table oldTable, Table newTable, HiveOperationContext context) { - FieldSchema columnOld = findRenamedColumn(oldTable, newTable); - FieldSchema columnNew = findRenamedColumn(newTable, oldTable); - boolean isColumnRename = columnOld != null && columnNew != null; - - if (isColumnRename) { - context.setColumnOld(columnOld); - context.setColumnNew(columnNew); - } - - return isColumnRename; - } -} \ No newline at end of file diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveOperationContext.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveOperationContext.java deleted file mode 100644 index 23ea4be..0000000 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveOperationContext.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.hive.hook; - -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.events.ListenerEvent; -import org.apache.hadoop.hive.ql.plan.HiveOperation; - -public class HiveOperationContext { - HiveOperation operation; - ListenerEvent event; - FieldSchema columnOld; - FieldSchema columnNew; - - public HiveOperationContext(ListenerEvent event) { - this(null, event); - } - - public HiveOperationContext(HiveOperation operation, ListenerEvent event) { - setOperation(operation); - setEvent(event); - setColumnOld(null); - setColumnNew(null); - } - - public ListenerEvent getEvent() { - return event; - } - - public void setEvent(ListenerEvent event) { - this.event = event; - } - - public HiveOperation getOperation() { - return operation; - } - - public void setOperation(HiveOperation operation) { - this.operation = operation; - } - - public FieldSchema getColumnOld() { - return columnOld; - } - - public void setColumnOld(FieldSchema columnOld) { - this.columnOld = columnOld; - } - - public FieldSchema getColumnNew() { - return columnNew; - } - - public void setColumnNew(FieldSchema columnNew) { - this.columnNew = columnNew; - } -} diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterDatabase.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterDatabase.java index 6b01c4e..6808574 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterDatabase.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterDatabase.java @@ -19,22 +19,15 @@ package org.apache.atlas.hive.hook.events; import org.apache.atlas.hive.hook.AtlasHiveHookContext; -import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2; import org.apache.commons.collections.CollectionUtils; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; public class AlterDatabase extends CreateDatabase { - private static final Logger LOG = LoggerFactory.getLogger(AlterDatabase.class); - public AlterDatabase(AtlasHiveHookContext context) { super(context); } @@ -42,7 +35,7 @@ public class AlterDatabase extends CreateDatabase { @Override public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); + AtlasEntitiesWithExtInfo entities = getEntities(); if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) { ret = Collections.singletonList(new EntityUpdateRequestV2(getUserName(), entities)); @@ -50,27 +43,4 @@ public class AlterDatabase extends CreateDatabase { return ret; } - - public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception { - AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); - AlterDatabaseEvent dbEvent = (AlterDatabaseEvent) context.getMetastoreEvent(); - Database oldDb = dbEvent.getOldDatabase(); - Database newDb = dbEvent.getNewDatabase(); - - if (newDb != null) { - AtlasEntity dbEntity = toDbEntity(newDb); - - ret.addEntity(dbEntity); - } else { - LOG.error("AlterDatabase.getEntities(): failed to retrieve db"); - } - - addProcessedEntities(ret); - - return ret; - } - - public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception { - return super.getHiveEntities(); - } -} \ No newline at end of file +} diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTable.java index e164370..adad81a 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTable.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTable.java @@ -35,7 +35,7 @@ public class AlterTable extends CreateTable { @Override public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); + AtlasEntitiesWithExtInfo entities = getEntities(); if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) { ret = Collections.singletonList(new EntityUpdateRequestV2(getUserName(), entities)); @@ -43,4 +43,4 @@ public class AlterTable extends CreateTable { return ret; } -} \ No newline at end of file +} diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java index d3d8349..7e15d0e 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRename.java @@ -29,7 +29,6 @@ import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateR import org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Table; @@ -50,48 +49,24 @@ public class AlterTableRename extends BaseHiveEvent { @Override public List<HookNotification> getNotificationMessages() throws Exception { - return context.isMetastoreHook() ? getHiveMetastoreMessages() : getHiveMessages(); - } - - public List<HookNotification> getHiveMetastoreMessages() throws Exception { - List<HookNotification> ret = new ArrayList<>(); - AlterTableEvent tblEvent = (AlterTableEvent) context.getMetastoreEvent(); - Table oldTable = toTable(tblEvent.getOldTable()); - Table newTable = toTable(tblEvent.getNewTable()); - - if (newTable == null) { - LOG.error("AlterTableRename: renamed table not found in outputs list"); - - return ret; - } - - processTables(oldTable, newTable, ret); - - return ret; - } - - public List<HookNotification> getHiveMessages() throws Exception { List<HookNotification> ret = new ArrayList<>(); - Table oldTable; - Table newTable; - if (CollectionUtils.isEmpty(getInputs())) { + if (CollectionUtils.isEmpty(getHiveContext().getInputs())) { LOG.error("AlterTableRename: old-table not found in inputs list"); return ret; } - oldTable = getInputs().iterator().next().getTable(); - newTable = null; + Table oldTable = getHiveContext().getInputs().iterator().next().getTable(); + Table newTable = null; - if (CollectionUtils.isNotEmpty(getOutputs())) { - for (WriteEntity entity : getOutputs()) { + if (CollectionUtils.isNotEmpty(getHiveContext().getOutputs())) { + for (WriteEntity entity : getHiveContext().getOutputs()) { if (entity.getType() == Entity.Type.TABLE) { newTable = entity.getTable(); //Hive sends with both old and new table names in the outputs which is weird. So skipping that with the below check - if (StringUtils.equalsIgnoreCase(newTable.getDbName(), oldTable.getDbName()) && - StringUtils.equalsIgnoreCase(newTable.getTableName(), oldTable.getTableName())) { + if (StringUtils.equalsIgnoreCase(newTable.getDbName(), oldTable.getDbName()) && StringUtils.equalsIgnoreCase(newTable.getTableName(), oldTable.getTableName())) { newTable = null; continue; @@ -110,17 +85,11 @@ public class AlterTableRename extends BaseHiveEvent { return ret; } - processTables(oldTable, newTable, ret); - - return ret; - } - - private void processTables(Table oldTable, Table newTable, List<HookNotification> ret) throws Exception { AtlasEntityWithExtInfo oldTableEntity = toTableEntity(oldTable); AtlasEntityWithExtInfo renamedTableEntity = toTableEntity(newTable); if (oldTableEntity == null || renamedTableEntity == null) { - return; + return ret; } // first update with oldTable info, so that the table will be created if it is not present in Atlas @@ -141,13 +110,14 @@ public class AlterTableRename extends BaseHiveEvent { // set previous name as the alias renamedTableEntity.getEntity().setAttribute(ATTRIBUTE_ALIASES, Collections.singletonList(oldTable.getTableName())); - String oldTableQualifiedName = (String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME); - AtlasObjectId oldTableId = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableQualifiedName); + AtlasObjectId oldTableId = new AtlasObjectId(oldTableEntity.getEntity().getTypeName(), ATTRIBUTE_QUALIFIED_NAME, oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME)); // update qualifiedName and other attributes (like params - which include lastModifiedTime, lastModifiedBy) of the table ret.add(new EntityPartialUpdateRequestV2(getUserName(), oldTableId, renamedTableEntity)); - context.removeFromKnownTable(oldTableQualifiedName); + context.removeFromKnownTable((String) oldTableEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + + return ret; } private void renameColumns(List<AtlasObjectId> columns, AtlasEntityExtInfo oldEntityExtInfo, String newTableQualifiedName, List<HookNotification> notifications) { diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRenameCol.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRenameCol.java index 756a608..5bbdd81 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRenameCol.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/AlterTableRenameCol.java @@ -26,7 +26,6 @@ import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityPartialUpdateRequestV2; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.events.AlterTableEvent; import org.apache.hadoop.hive.ql.metadata.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,79 +34,64 @@ import java.util.ArrayList; import java.util.List; public class AlterTableRenameCol extends AlterTable { - private static final Logger LOG = LoggerFactory.getLogger(AlterTableRenameCol.class); - private final FieldSchema columnOld; - private final FieldSchema columnNew; + private static final Logger LOG = LoggerFactory.getLogger(AlterTableRenameCol.class); public AlterTableRenameCol(AtlasHiveHookContext context) { - this(null, null, context); - } - - public AlterTableRenameCol(FieldSchema columnOld, FieldSchema columnNew, AtlasHiveHookContext context) { super(context); - - this.columnOld = columnOld; - this.columnNew = columnNew; } @Override public List<HookNotification> getNotificationMessages() throws Exception { - return context.isMetastoreHook() ? getHiveMetastoreMessages() : getHiveMessages(); - } - - public List<HookNotification> getHiveMetastoreMessages() throws Exception { - List<HookNotification> baseMsgs = super.getNotificationMessages(); - List<HookNotification> ret = new ArrayList<>(baseMsgs); - AlterTableEvent tblEvent = (AlterTableEvent) context.getMetastoreEvent(); - Table oldTable = toTable(tblEvent.getOldTable()); - Table newTable = toTable(tblEvent.getNewTable()); - - processColumns(oldTable, newTable, ret); - - return ret; - } - - public List<HookNotification> getHiveMessages() throws Exception { - List<HookNotification> baseMsgs = super.getNotificationMessages(); - List<HookNotification> ret = new ArrayList<>(baseMsgs); - - if (CollectionUtils.isEmpty(getInputs())) { + if (CollectionUtils.isEmpty(getHiveContext().getInputs())) { LOG.error("AlterTableRenameCol: old-table not found in inputs list"); return null; } - if (CollectionUtils.isEmpty(getOutputs())) { + if (CollectionUtils.isEmpty(getHiveContext().getOutputs())) { LOG.error("AlterTableRenameCol: new-table not found in outputs list"); return null; } + List<HookNotification> baseMsgs = super.getNotificationMessages(); + if (CollectionUtils.isEmpty(baseMsgs)) { LOG.debug("Skipped processing of column-rename (on a temporary table?)"); return null; } - Table oldTable = getInputs().iterator().next().getTable(); - Table newTable = getOutputs().iterator().next().getTable(); + List<HookNotification> ret = new ArrayList<>(baseMsgs); + Table oldTable = getHiveContext().getInputs().iterator().next().getTable(); + Table newTable = getHiveContext().getOutputs().iterator().next().getTable(); + + newTable = getHive().getTable(newTable.getDbName(), newTable.getTableName()); - if (newTable != null) { - newTable = getHive().getTable(newTable.getDbName(), newTable.getTableName()); - } + List<FieldSchema> oldColumns = oldTable.getCols(); + List<FieldSchema> newColumns = newTable.getCols(); + FieldSchema changedColumnOld = null; + FieldSchema changedColumnNew = null; - processColumns(oldTable, newTable, ret); + for (FieldSchema oldColumn : oldColumns) { + if (!newColumns.contains(oldColumn)) { + changedColumnOld = oldColumn; - return ret; - } + break; + } + } - private void processColumns(Table oldTable, Table newTable, List<HookNotification> ret) { - FieldSchema changedColumnOld = (columnOld == null) ? findRenamedColumn(oldTable, newTable) : columnOld; - FieldSchema changedColumnNew = (columnNew == null) ? findRenamedColumn(newTable, oldTable) : columnNew; + for (FieldSchema newColumn : newColumns) { + if (!oldColumns.contains(newColumn)) { + changedColumnNew = newColumn; + + break; + } + } if (changedColumnOld != null && changedColumnNew != null) { AtlasObjectId oldColumnId = new AtlasObjectId(HIVE_TYPE_COLUMN, ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(oldTable, changedColumnOld)); - AtlasEntity newColumn = new AtlasEntity(HIVE_TYPE_COLUMN); + AtlasEntity newColumn = new AtlasEntity(HIVE_TYPE_COLUMN); newColumn.setAttribute(ATTRIBUTE_NAME, changedColumnNew.getName()); newColumn.setAttribute(ATTRIBUTE_QUALIFIED_NAME, getQualifiedName(newTable, changedColumnNew)); @@ -116,21 +100,7 @@ public class AlterTableRenameCol extends AlterTable { } else { LOG.error("AlterTableRenameCol: no renamed column detected"); } - } - - public static FieldSchema findRenamedColumn(Table inputTable, Table outputTable) { - FieldSchema ret = null; - List<FieldSchema> inputColumns = inputTable.getCols(); - List<FieldSchema> outputColumns = outputTable.getCols(); - - for (FieldSchema inputColumn : inputColumns) { - if (!outputColumns.contains(inputColumn)) { - ret = inputColumn; - - break; - } - } return ret; } -} \ No newline at end of file +} diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java index 4127c61..31346d0 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/BaseHiveEvent.java @@ -37,10 +37,11 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.utils.SecurityUtils; -import org.apache.hadoop.hive.ql.hooks.*; +import org.apache.hadoop.hive.ql.hooks.Entity; +import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.DependencyKey; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.HiveOperation; @@ -327,8 +328,7 @@ public abstract class BaseHiveEvent { } protected AtlasEntity toTableEntity(Table table, AtlasEntityExtInfo entityExtInfo) throws Exception { - Database db = getDatabases(table.getDbName()); - AtlasEntity dbEntity = toDbEntity(db); + AtlasEntity dbEntity = toDbEntity(getHive().getDatabase(table.getDbName())); if (entityExtInfo != null) { if (dbEntity != null) { @@ -594,7 +594,8 @@ public abstract class BaseHiveEvent { protected AtlasEntity getHiveProcessEntity(List<AtlasEntity> inputs, List<AtlasEntity> outputs) throws Exception { AtlasEntity ret = new AtlasEntity(HIVE_TYPE_PROCESS); - String queryStr = getQueryString(); + HookContext hookContext = getHiveContext(); + String queryStr = hookContext.getQueryPlan().getQueryStr(); if (queryStr != null) { queryStr = queryStr.toLowerCase().trim(); @@ -604,12 +605,12 @@ public abstract class BaseHiveEvent { ret.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputs)); ret.setAttribute(ATTRIBUTE_OUTPUTS, getObjectIds(outputs)); ret.setAttribute(ATTRIBUTE_NAME, queryStr); - ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, getOperationName()); - ret.setAttribute(ATTRIBUTE_START_TIME, getQueryStartTime()); + ret.setAttribute(ATTRIBUTE_OPERATION_TYPE, hookContext.getOperationName()); + ret.setAttribute(ATTRIBUTE_START_TIME, hookContext.getQueryPlan().getQueryStartTime()); ret.setAttribute(ATTRIBUTE_END_TIME, System.currentTimeMillis()); ret.setAttribute(ATTRIBUTE_USER_NAME, getUserName()); ret.setAttribute(ATTRIBUTE_QUERY_TEXT, queryStr); - ret.setAttribute(ATTRIBUTE_QUERY_ID, getQueryId()); + ret.setAttribute(ATTRIBUTE_QUERY_ID, hookContext.getQueryPlan().getQuery().getQueryId()); ret.setAttribute(ATTRIBUTE_QUERY_PLAN, "Not Supported"); ret.setAttribute(ATTRIBUTE_RECENT_QUERIES, Collections.singletonList(queryStr)); @@ -620,86 +621,34 @@ public abstract class BaseHiveEvent { return context.getClusterName(); } - protected Database getDatabases(String dbName) throws Exception { - return context.isMetastoreHook() ? context.getMetastoreHandler().get_database(dbName) : - context.getHive().getDatabase(dbName); - } - protected Hive getHive() { return context.getHive(); } - protected Set<ReadEntity> getInputs() { - return context != null ? context.getInputs() : Collections.emptySet(); - } - - protected Set<WriteEntity> getOutputs() { - return context != null ? context.getOutputs() : Collections.emptySet(); - } - - protected LineageInfo getLineageInfo() { - return context != null ? context.getLineageInfo() : null; - } - - protected String getQueryString() { - return isHiveContextValid() ? context.getHiveContext().getQueryPlan().getQueryStr() : null; - } - - protected String getOperationName() { - return isHiveContextValid() ? context.getHiveContext().getOperationName() : null; - } - - protected String getHiveUserName() { - return isHiveContextValid() ? context.getHiveContext().getUserName() : null; - } - - protected UserGroupInformation getUgi() { - return isHiveContextValid() ? context.getHiveContext().getUgi() : null; - } - - protected Long getQueryStartTime() { - return isHiveContextValid() ? context.getHiveContext().getQueryPlan().getQueryStartTime() : null; - } - - protected String getQueryId() { - return isHiveContextValid() ? context.getHiveContext().getQueryPlan().getQueryId() : null; - } - - private boolean isHiveContextValid() { - return context != null && context.getHiveContext() != null; + protected HookContext getHiveContext() { + return context.getHiveContext(); } protected String getUserName() { - String ret = null; - UserGroupInformation ugi = null; + String ret = getHiveContext().getUserName(); - if (context.isMetastoreHook()) { - try { - ugi = SecurityUtils.getUGI(); - } catch (Exception e) { - //do nothing + if (StringUtils.isEmpty(ret)) { + UserGroupInformation ugi = getHiveContext().getUgi(); + + if (ugi != null) { + ret = ugi.getShortUserName(); } - } else { - ret = getHiveUserName(); if (StringUtils.isEmpty(ret)) { - ugi = getUgi(); + try { + ret = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e); + ret = System.getProperty("user.name"); + } } } - if (ugi != null) { - ret = ugi.getShortUserName(); - } - - if (StringUtils.isEmpty(ret)) { - try { - ret = UserGroupInformation.getCurrentUser().getShortUserName(); - } catch (IOException e) { - LOG.warn("Failed for UserGroupInformation.getCurrentUser() ", e); - - ret = System.getProperty("user.name"); - } - } return ret; } @@ -808,7 +757,7 @@ public abstract class BaseHiveEvent { operation == HiveOperation.CREATEVIEW || operation == HiveOperation.ALTERVIEW_AS || operation == HiveOperation.ALTERTABLE_LOCATION) { - List<? extends Entity> sortedEntities = new ArrayList<>(getOutputs()); + List<? extends Entity> sortedEntities = new ArrayList<>(getHiveContext().getOutputs()); Collections.sort(sortedEntities, entityComparator); @@ -825,23 +774,15 @@ public abstract class BaseHiveEvent { } } - String qualifiedName = null; - String operationName = getOperationName(); + StringBuilder sb = new StringBuilder(getHiveContext().getOperationName()); - if (operationName != null) { - StringBuilder sb = new StringBuilder(operationName); + boolean ignoreHDFSPaths = ignoreHDFSPathsinProcessQualifiedName(); - boolean ignoreHDFSPaths = ignoreHDFSPathsinProcessQualifiedName(); + addToProcessQualifiedName(sb, getHiveContext().getInputs(), ignoreHDFSPaths); + sb.append("->"); + addToProcessQualifiedName(sb, getHiveContext().getOutputs(), ignoreHDFSPaths); - addToProcessQualifiedName(sb, getInputs(), ignoreHDFSPaths); - sb.append("->"); - addToProcessQualifiedName(sb, getOutputs(), ignoreHDFSPaths); - - qualifiedName = sb.toString(); - } - - - return qualifiedName; + return sb.toString(); } protected AtlasEntity toReferencedHBaseTable(Table table, AtlasEntitiesWithExtInfo entities) { @@ -895,9 +836,9 @@ public abstract class BaseHiveEvent { switch (context.getHiveOperation()) { case LOAD: case IMPORT: - return hasPartitionEntity(getOutputs()); + return hasPartitionEntity(getHiveContext().getOutputs()); case EXPORT: - return hasPartitionEntity(getInputs()); + return hasPartitionEntity(getHiveContext().getInputs()); case QUERY: return true; } @@ -1065,8 +1006,4 @@ public abstract class BaseHiveEvent { return hbaseTableName; } } - - public static Table toTable(org.apache.hadoop.hive.metastore.api.Table table) { - return new Table(table); - } } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java index b01f61f..d017dbe 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateDatabase.java @@ -25,7 +25,6 @@ import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent; import org.apache.hadoop.hive.ql.hooks.Entity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,8 +32,6 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; -import static org.apache.hadoop.hive.ql.hooks.Entity.Type.DATABASE; - public class CreateDatabase extends BaseHiveEvent { private static final Logger LOG = LoggerFactory.getLogger(CreateDatabase.class); @@ -45,7 +42,7 @@ public class CreateDatabase extends BaseHiveEvent { @Override public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); + AtlasEntitiesWithExtInfo entities = getEntities(); if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) { ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities)); @@ -54,29 +51,11 @@ public class CreateDatabase extends BaseHiveEvent { return ret; } - public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception { - AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); - CreateDatabaseEvent dbEvent = (CreateDatabaseEvent) context.getMetastoreEvent(); - Database db = dbEvent.getDatabase(); - - if (db != null) { - AtlasEntity dbEntity = toDbEntity(db); - - ret.addEntity(dbEntity); - } else { - LOG.error("CreateDatabase.getEntities(): failed to retrieve db"); - } - - addProcessedEntities(ret); - - return ret; - } - - public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception { + public AtlasEntitiesWithExtInfo getEntities() throws Exception { AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); - for (Entity entity : getOutputs()) { - if (entity.getType() == DATABASE) { + for (Entity entity : getHiveContext().getOutputs()) { + if (entity.getType() == Entity.Type.DATABASE) { Database db = entity.getDatabase(); if (db != null) { @@ -97,4 +76,4 @@ public class CreateDatabase extends BaseHiveEvent { return ret; } -} \ No newline at end of file +} diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java index 019deb7..d61f1d7 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateHiveProcess.java @@ -72,12 +72,13 @@ public class CreateHiveProcess extends BaseHiveEvent { if (!skipProcess()) { List<AtlasEntity> inputs = new ArrayList<>(); List<AtlasEntity> outputs = new ArrayList<>(); + HookContext hiveContext = getHiveContext(); Set<String> processedNames = new HashSet<>(); ret = new AtlasEntitiesWithExtInfo(); - if (getInputs() != null) { - for (ReadEntity input : getInputs()) { + if (hiveContext.getInputs() != null) { + for (ReadEntity input : hiveContext.getInputs()) { String qualifiedName = getQualifiedName(input); if (qualifiedName == null || !processedNames.add(qualifiedName)) { @@ -96,8 +97,8 @@ public class CreateHiveProcess extends BaseHiveEvent { } } - if (getOutputs() != null) { - for (WriteEntity output : getOutputs()) { + if (hiveContext.getOutputs() != null) { + for (WriteEntity output : hiveContext.getOutputs()) { String qualifiedName = getQualifiedName(output); if (qualifiedName == null || !processedNames.add(qualifiedName)) { @@ -129,7 +130,7 @@ public class CreateHiveProcess extends BaseHiveEvent { } private void processColumnLineage(AtlasEntity hiveProcess, AtlasEntitiesWithExtInfo entities) { - LineageInfo lineageInfo = getLineageInfo(); + LineageInfo lineageInfo = getHiveContext().getLinfo(); if (lineageInfo == null || CollectionUtils.isEmpty(lineageInfo.entrySet())) { return; @@ -234,8 +235,8 @@ public class CreateHiveProcess extends BaseHiveEvent { private boolean skipProcess() { - Set<ReadEntity> inputs = getInputs(); - Set<WriteEntity> outputs = getOutputs(); + Set<ReadEntity> inputs = getHiveContext().getInputs(); + Set<WriteEntity> outputs = getHiveContext().getOutputs(); boolean ret = CollectionUtils.isEmpty(inputs) && CollectionUtils.isEmpty(outputs); diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java index b6ec5c3..674a89f 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/CreateTable.java @@ -24,19 +24,14 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2; import org.apache.commons.collections.CollectionUtils; -import org.apache.hadoop.hive.metastore.events.AlterTableEvent; -import org.apache.hadoop.hive.metastore.events.CreateTableEvent; -import org.apache.hadoop.hive.metastore.events.ListenerEvent; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.plan.HiveOperation; import java.util.Collections; import java.util.List; -import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; -import static org.apache.hadoop.hive.ql.plan.HiveOperation.*; - public class CreateTable extends BaseHiveEvent { private final boolean skipTempTables; @@ -49,7 +44,7 @@ public class CreateTable extends BaseHiveEvent { @Override public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - AtlasEntitiesWithExtInfo entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); + AtlasEntitiesWithExtInfo entities = getEntities(); if (entities != null && CollectionUtils.isNotEmpty(entities.getEntities())) { ret = Collections.singletonList(new EntityCreateRequestV2(getUserName(), entities)); @@ -58,62 +53,31 @@ public class CreateTable extends BaseHiveEvent { return ret; } - public AtlasEntitiesWithExtInfo getHiveMetastoreEntities() throws Exception { - AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); - ListenerEvent event = context.getMetastoreEvent(); - HiveOperation oper = context.getHiveOperation(); - Table table; - - if (isAlterTable(oper)) { - table = toTable(((AlterTableEvent) event).getNewTable()); - } else { - table = toTable(((CreateTableEvent) event).getTable()); - } - - if (skipTemporaryTable(table)) { - table = null; - } - - processTable(table, ret); - - addProcessedEntities(ret); - - return ret; - } - - public AtlasEntitiesWithExtInfo getHiveEntities() throws Exception { + public AtlasEntitiesWithExtInfo getEntities() throws Exception { AtlasEntitiesWithExtInfo ret = new AtlasEntitiesWithExtInfo(); + Database db = null; Table table = null; - if (CollectionUtils.isNotEmpty(getOutputs())) { - for (Entity entity : getOutputs()) { - if (entity.getType() == Entity.Type.TABLE) { - table = entity.getTable(); + for (Entity entity : getHiveContext().getOutputs()) { + if (entity.getType() == Entity.Type.TABLE) { + table = entity.getTable(); + + if (table != null) { + db = getHive().getDatabase(table.getDbName()); + table = getHive().getTable(table.getDbName(), table.getTableName()); if (table != null) { - table = getHive().getTable(table.getDbName(), table.getTableName()); - - if (table != null) { - if (skipTemporaryTable(table)) { - table = null; - } else { - break; - } + // If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage. + if (skipTempTables && table.isTemporary() && !TableType.EXTERNAL_TABLE.equals(table.getTableType())) { + table = null; + } else { + break; } } } } } - processTable(table, ret); - - addProcessedEntities(ret); - - return ret; - } - - // create process entities for lineages from HBase/HDFS to hive table - private void processTable(Table table, AtlasEntitiesWithExtInfo ret) throws Exception { if (table != null) { AtlasEntity tblEntity = toTableEntity(table, ret); @@ -125,7 +89,7 @@ public class CreateTable extends BaseHiveEvent { if (hbaseTableEntity != null) { final AtlasEntity processEntity; - if (EXTERNAL_TABLE.equals(table.getTableType())) { + if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { processEntity = getHiveProcessEntity(Collections.singletonList(hbaseTableEntity), Collections.singletonList(tblEntity)); } else { processEntity = getHiveProcessEntity(Collections.singletonList(tblEntity), Collections.singletonList(hbaseTableEntity)); @@ -134,7 +98,7 @@ public class CreateTable extends BaseHiveEvent { ret.addEntity(processEntity); } } else { - if (EXTERNAL_TABLE.equals(table.getTableType())) { + if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) { AtlasEntity hdfsPathEntity = getPathEntity(table.getDataLocation(), ret); AtlasEntity processEntity = getHiveProcessEntity(Collections.singletonList(hdfsPathEntity), Collections.singletonList(tblEntity)); @@ -144,14 +108,9 @@ public class CreateTable extends BaseHiveEvent { } } } - } - private static boolean isAlterTable(HiveOperation oper) { - return (oper == ALTERTABLE_PROPERTIES || oper == ALTERTABLE_RENAME || oper == ALTERTABLE_RENAMECOL); - } + addProcessedEntities(ret); - private boolean skipTemporaryTable(Table table) { - // If its an external table, even though the temp table skip flag is on, we create the table since we need the HDFS path to temp table lineage. - return table != null && skipTempTables && table.isTemporary() && !EXTERNAL_TABLE.equals(table.getTableType()); + return ret; } } diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java index 20019d2..1795bf2 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropDatabase.java @@ -23,25 +23,21 @@ import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2; import org.apache.commons.collections.CollectionUtils; -import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent; import org.apache.hadoop.hive.ql.hooks.Entity; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import static org.apache.hadoop.hive.ql.hooks.Entity.Type.DATABASE; -import static org.apache.hadoop.hive.ql.hooks.Entity.Type.TABLE; - public class DropDatabase extends BaseHiveEvent { public DropDatabase(AtlasHiveHookContext context) { super(context); } @Override - public List<HookNotification> getNotificationMessages() { + public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - List<AtlasObjectId> entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); + List<AtlasObjectId> entities = getEntities(); if (CollectionUtils.isNotEmpty(entities)) { ret = new ArrayList<>(entities.size()); @@ -54,40 +50,27 @@ public class DropDatabase extends BaseHiveEvent { return ret; } - private List<AtlasObjectId> getHiveMetastoreEntities() { - List<AtlasObjectId> ret = new ArrayList<>(); - DropDatabaseEvent dbEvent = (DropDatabaseEvent) context.getMetastoreEvent(); - String dbQName = getQualifiedName(dbEvent.getDatabase()); - AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_DB, ATTRIBUTE_QUALIFIED_NAME, dbQName); - - context.removeFromKnownDatabase(dbQName); - - ret.add(dbId); - - return ret; - } - - private List<AtlasObjectId> getHiveEntities() { + public List<AtlasObjectId> getEntities() throws Exception { List<AtlasObjectId> ret = new ArrayList<>(); - for (Entity entity : getOutputs()) { - if (entity.getType() == DATABASE) { + for (Entity entity : getHiveContext().getOutputs()) { + if (entity.getType() == Entity.Type.DATABASE) { String dbQName = getQualifiedName(entity.getDatabase()); AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_DB, ATTRIBUTE_QUALIFIED_NAME, dbQName); context.removeFromKnownDatabase(dbQName); ret.add(dbId); - } else if (entity.getType() == TABLE) { + } else if (entity.getType() == Entity.Type.TABLE) { String tblQName = getQualifiedName(entity.getTable()); - AtlasObjectId tblId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); + AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); context.removeFromKnownTable(tblQName); - ret.add(tblId); + ret.add(dbId); } } return ret; } -} \ No newline at end of file +} diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java index 440c08a..a0f7d8a 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/events/DropTable.java @@ -23,9 +23,7 @@ import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2; import org.apache.commons.collections.CollectionUtils; -import org.apache.hadoop.hive.metastore.events.DropTableEvent; import org.apache.hadoop.hive.ql.hooks.Entity; -import org.apache.hadoop.hive.ql.metadata.Table; import java.util.ArrayList; import java.util.Collections; @@ -37,9 +35,9 @@ public class DropTable extends BaseHiveEvent { } @Override - public List<HookNotification> getNotificationMessages() { + public List<HookNotification> getNotificationMessages() throws Exception { List<HookNotification> ret = null; - List<AtlasObjectId> entities = context.isMetastoreHook() ? getHiveMetastoreEntities() : getHiveEntities(); + List<AtlasObjectId> entities = getEntities(); if (CollectionUtils.isNotEmpty(entities)) { ret = new ArrayList<>(entities.size()); @@ -52,34 +50,20 @@ public class DropTable extends BaseHiveEvent { return ret; } - public List<AtlasObjectId> getHiveMetastoreEntities() { - List<AtlasObjectId> ret = new ArrayList<>(); - DropTableEvent tblEvent = (DropTableEvent) context.getMetastoreEvent(); - Table table = new Table(tblEvent.getTable()); - String tblQName = getQualifiedName(table); - AtlasObjectId tblId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); - - context.removeFromKnownTable(tblQName); - - ret.add(tblId); - - return ret; - } - - public List<AtlasObjectId> getHiveEntities() { + public List<AtlasObjectId> getEntities() throws Exception { List<AtlasObjectId> ret = new ArrayList<>(); - for (Entity entity : getOutputs()) { + for (Entity entity : getHiveContext().getOutputs()) { if (entity.getType() == Entity.Type.TABLE) { String tblQName = getQualifiedName(entity.getTable()); - AtlasObjectId tblId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); + AtlasObjectId dbId = new AtlasObjectId(HIVE_TYPE_TABLE, ATTRIBUTE_QUALIFIED_NAME, tblQName); context.removeFromKnownTable(tblQName); - ret.add(tblId); + ret.add(dbId); } } return ret; } -} \ No newline at end of file +}