Repository: atlas Updated Branches: refs/heads/branch-0.8 99593dff7 -> e8908dbfe
ATLAS-2439: updated Sqoop hook to use V2 notifications Signed-off-by: Madhan Neethiraj <mad...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e8908dbf Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e8908dbf Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e8908dbf Branch: refs/heads/branch-0.8 Commit: e8908dbfe1d4dfe641cc7a802625f396aa0a399d Parents: 99593df Author: rdsolani <rdsol...@gmail.com> Authored: Thu Feb 1 19:41:33 2018 +0530 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Tue Feb 13 14:03:34 2018 -0800 ---------------------------------------------------------------------- .../org/apache/atlas/sqoop/hook/SqoopHook.java | 213 ++++++++++--------- 1 file changed, 114 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/e8908dbf/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index 50e20fa..5201122 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -26,9 +26,12 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHookException; -import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2; +import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; import org.apache.atlas.sqoop.model.SqoopDataTypes; -import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.type.AtlasTypeUtil; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; import org.apache.sqoop.SqoopJobDataPublisher; @@ -46,151 +49,163 @@ import java.util.Properties; * AtlasHook sends lineage information to the AtlasSever. */ public class SqoopHook extends SqoopJobDataPublisher { - private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class); - public static final String CONF_PREFIX = "atlas.hook.sqoop."; - public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name"; + public static final String CONF_PREFIX = "atlas.hook.sqoop."; + public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; + public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name"; public static final String DEFAULT_CLUSTER_NAME = "primary"; - public static final String USER = "userName"; - public static final String DB_STORE_TYPE = "dbStoreType"; + public static final String USER = "userName"; + public static final String DB_STORE_TYPE = "dbStoreType"; public static final String DB_STORE_USAGE = "storeUse"; - public static final String SOURCE = "source"; - public static final String DESCRIPTION = "description"; - public static final String STORE_URI = "storeUri"; - public static final String OPERATION = "operation"; - public static final String START_TIME = "startTime"; - public static final String END_TIME = "endTime"; - public static final String CMD_LINE_OPTS = "commandlineOpts"; - // multiple inputs and outputs for process - public static final String INPUTS = "inputs"; - public static final String OUTPUTS = "outputs"; + public static final String SOURCE = "source"; + public static final String DESCRIPTION = "description"; + public static final String STORE_URI = "storeUri"; + public static final String OPERATION = "operation"; + public static final String START_TIME = "startTime"; + public static final String END_TIME = "endTime"; + public static final String CMD_LINE_OPTS = "commandlineOpts"; + public static final String INPUTS = "inputs"; + public static final String OUTPUTS = "outputs"; static { org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml"); } - public Referenceable createHiveDatabaseInstance(String clusterName, String dbName) { - Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); - dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); - dbRef.set(AtlasClient.NAME, dbName); - dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); - return dbRef; - } + @Override + public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException { + try { + Configuration atlasProperties = ApplicationProperties.get(); + String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); + AtlasEntity entDbStore = createDBStoreInstance(data); + AtlasEntity entHiveDb = createHiveDatabaseInstance(clusterName, data.getHiveDB()); + AtlasEntity entHiveTable = createHiveTableInstance(entHiveDb, data.getHiveTable()); + AtlasEntity entProcess = createSqoopProcessInstance(entDbStore, entHiveTable, data, clusterName); + + AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess); - public Referenceable createHiveTableInstance(String clusterName, Referenceable dbRef, - String tableName, String dbName) { - Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); - tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); - tableRef.set(AtlasClient.NAME, tableName.toLowerCase()); - tableRef.set(HiveMetaStoreBridge.DB, dbRef); - return tableRef; + entities.addReferredEntity(entDbStore); + entities.addReferredEntity(entHiveDb); + entities.addReferredEntity(entHiveTable); + + HookNotificationMessage message = new EntityUpdateRequestV2(AtlasHook.getUser(), entities); + + AtlasHook.notifyEntities(Arrays.asList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3)); + } catch(Exception e) { + throw new AtlasHookException("SqoopHook.publish() failed.", e); } + } - private Referenceable createDBStoreInstance(SqoopJobDataPublisher.Data data) - throws ImportException { + private AtlasEntity createHiveDatabaseInstance(String clusterName, String dbName) { + AtlasEntity entHiveDb = new AtlasEntity(HiveDataTypes.HIVE_DB.getName()); + String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName); + + entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); + entHiveDb.setAttribute(AtlasClient.NAME, dbName); + entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName); + + return entHiveDb; + } - Referenceable storeRef = new Referenceable(SqoopDataTypes.SQOOP_DBDATASTORE.getName()); + private AtlasEntity createHiveTableInstance(AtlasEntity entHiveDb, String tableName) { + AtlasEntity entHiveTable = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()); + String qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName); + + entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase()); + entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName); + entHiveTable.setAttribute(HiveMetaStoreBridge.DB, AtlasTypeUtil.getAtlasObjectId(entHiveDb)); + + return entHiveTable; + } + + private AtlasEntity createDBStoreInstance(SqoopJobDataPublisher.Data data) throws ImportException { String table = data.getStoreTable(); String query = data.getStoreQuery(); + if (StringUtils.isBlank(table) && StringUtils.isBlank(query)) { throw new ImportException("Both table and query cannot be empty for DBStoreInstance"); } - String usage = table != null ? "TABLE" : "QUERY"; + String usage = table != null ? "TABLE" : "QUERY"; String source = table != null ? table : query; - String name = getSqoopDBStoreName(data); - storeRef.set(AtlasClient.NAME, name); - storeRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); - storeRef.set(SqoopHook.DB_STORE_TYPE, data.getStoreType()); - storeRef.set(SqoopHook.DB_STORE_USAGE, usage); - storeRef.set(SqoopHook.STORE_URI, data.getUrl()); - storeRef.set(SqoopHook.SOURCE, source); - storeRef.set(SqoopHook.DESCRIPTION, ""); - storeRef.set(AtlasClient.OWNER, data.getUser()); - return storeRef; + String name = getSqoopDBStoreName(data); + + AtlasEntity entDbStore = new AtlasEntity(SqoopDataTypes.SQOOP_DBDATASTORE.getName()); + + entDbStore.setAttribute(AtlasClient.NAME, name); + entDbStore.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); + entDbStore.setAttribute(SqoopHook.DB_STORE_TYPE, data.getStoreType()); + entDbStore.setAttribute(SqoopHook.DB_STORE_USAGE, usage); + entDbStore.setAttribute(SqoopHook.STORE_URI, data.getUrl()); + entDbStore.setAttribute(SqoopHook.SOURCE, source); + entDbStore.setAttribute(SqoopHook.DESCRIPTION, ""); + entDbStore.setAttribute(AtlasClient.OWNER, data.getUser()); + + return entDbStore; } - private Referenceable createSqoopProcessInstance(Referenceable dbStoreRef, Referenceable hiveTableRef, - SqoopJobDataPublisher.Data data, String clusterName) { - Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName()); - final String sqoopProcessName = getSqoopProcessName(data, clusterName); - procRef.set(AtlasClient.NAME, sqoopProcessName); - procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName); - procRef.set(SqoopHook.OPERATION, data.getOperation()); - if (isImportOperation(data)) { - procRef.set(SqoopHook.INPUTS, dbStoreRef); - procRef.set(SqoopHook.OUTPUTS, hiveTableRef); - } else { - procRef.set(SqoopHook.INPUTS, hiveTableRef); - procRef.set(SqoopHook.OUTPUTS, dbStoreRef); - } - procRef.set(SqoopHook.USER, data.getUser()); - procRef.set(SqoopHook.START_TIME, new Date(data.getStartTime())); - procRef.set(SqoopHook.END_TIME, new Date(data.getEndTime())); + private AtlasEntity createSqoopProcessInstance(AtlasEntity entDbStore, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) { + AtlasEntity entProcess = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName()); + String sqoopProcessName = getSqoopProcessName(data, clusterName); + Map<String, String> sqoopOptionsMap = new HashMap<>(); + Properties options = data.getOptions(); - Map<String, String> sqoopOptionsMap = new HashMap<>(); - Properties options = data.getOptions(); for (Object k : options.keySet()) { sqoopOptionsMap.put((String)k, (String) options.get(k)); } - procRef.set(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap); - return procRef; + entProcess.setAttribute(AtlasClient.NAME, sqoopProcessName); + entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName); + entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation()); + + if (isImportOperation(data)) { + entProcess.setAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasObjectId(entDbStore)); + entProcess.setAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasObjectId(entHiveTable)); + } else { + entProcess.setAttribute(SqoopHook.INPUTS, AtlasTypeUtil.getAtlasObjectId(entHiveTable)); + entProcess.setAttribute(SqoopHook.OUTPUTS, AtlasTypeUtil.getAtlasObjectId(entDbStore)); + } + + entProcess.setAttribute(SqoopHook.USER, data.getUser()); + entProcess.setAttribute(SqoopHook.START_TIME, new Date(data.getStartTime())); + entProcess.setAttribute(SqoopHook.END_TIME, new Date(data.getEndTime())); + entProcess.setAttribute(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap); + + return entProcess; + } + + private boolean isImportOperation(SqoopJobDataPublisher.Data data) { + return data.getOperation().toLowerCase().equals("import"); } static String getSqoopProcessName(Data data, String clusterName) { - StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), - data.getUrl())); + StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl())); + if (StringUtils.isNotEmpty(data.getStoreTable())) { name.append(" --table ").append(data.getStoreTable()); } + if (StringUtils.isNotEmpty(data.getStoreQuery())) { name.append(" --query ").append(data.getStoreQuery()); } - name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", - data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName)); + + name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName)); + return name.toString(); } static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data) { StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl())); + if (StringUtils.isNotEmpty(data.getStoreTable())) { name.append(" --table ").append(data.getStoreTable()); } + if (StringUtils.isNotEmpty(data.getStoreQuery())) { name.append(" --query ").append(data.getStoreQuery()); } - return name.toString(); - } - static boolean isImportOperation(SqoopJobDataPublisher.Data data) { - return data.getOperation().toLowerCase().equals("import"); - } - - @Override - public void publish(SqoopJobDataPublisher.Data data) throws AtlasHookException { - try { - Configuration atlasProperties = ApplicationProperties.get(); - String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); - - Referenceable dbStoreRef = createDBStoreInstance(data); - Referenceable dbRef = createHiveDatabaseInstance(clusterName, data.getHiveDB()); - Referenceable hiveTableRef = createHiveTableInstance(clusterName, dbRef, - data.getHiveTable(), data.getHiveDB()); - Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName); - - int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3); - HookNotification.HookNotificationMessage message = - new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef); - AtlasHook.notifyEntities(Arrays.asList(message), maxRetries); - } - catch(Exception e) { - throw new AtlasHookException("SqoopHook.publish() failed.", e); - } + return name.toString(); } }