Repository: atlas Updated Branches: refs/heads/branch-0.8 4e27bf086 -> 7adbf8ffd
ATLAS-2462: Sqoop import for all tables throws NPE for no table provided in command Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/7adbf8ff Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/7adbf8ff Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/7adbf8ff Branch: refs/heads/branch-0.8 Commit: 7adbf8ffd27904577ecf694a9f861cdd46833069 Parents: 4e27bf0 Author: rdsolani <[email protected]> Authored: Fri Mar 23 17:18:20 2018 +0530 Committer: Madhan Neethiraj <[email protected]> Committed: Fri Mar 23 15:43:46 2018 -0700 ---------------------------------------------------------------------- .../org/apache/atlas/sqoop/hook/SqoopHook.java | 62 ++++++++++++-------- 1 file changed, 39 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/7adbf8ff/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index 9e43430..666ec13 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -28,7 +28,8 @@ import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.hook.AtlasHookException; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; -import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequestV2; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequestV2; import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; import org.apache.atlas.sqoop.model.SqoopDataTypes; import org.apache.atlas.type.AtlasTypeUtil; @@ -39,12 +40,12 @@ import org.apache.sqoop.util.ImportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; +import java.util.Collections; import java.util.Map; +import java.util.HashMap; import java.util.Properties; - +import java.util.List; +import java.util.Date; /** * AtlasHook sends lineage information to the AtlasSever. */ @@ -78,26 +79,30 @@ public class SqoopHook extends SqoopJobDataPublisher { try { Configuration atlasProperties = ApplicationProperties.get(); String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); - AtlasEntity entDbStore = createDBStoreInstance(data); - AtlasEntity entHiveDb = createHiveDatabaseInstance(clusterName, data.getHiveDB()); - AtlasEntity entHiveTable = createHiveTableInstance(entHiveDb, data.getHiveTable()); - AtlasEntity entProcess = createSqoopProcessInstance(entDbStore, entHiveTable, data, clusterName); + AtlasEntity entDbStore = toSqoopDBStoreEntity(data); + AtlasEntity entHiveDb = toHiveDatabaseEntity(clusterName, data.getHiveDB()); + AtlasEntity entHiveTable = data.getHiveTable() != null ? toHiveTableEntity(entHiveDb, data.getHiveTable()) : null; + AtlasEntity entProcess = toSqoopProcessEntity(entDbStore, entHiveDb, entHiveTable, data, clusterName); AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo(entProcess); entities.addReferredEntity(entDbStore); entities.addReferredEntity(entHiveDb); - entities.addReferredEntity(entHiveTable); + if (entHiveTable != null) { + entities.addReferredEntity(entHiveTable); + } - HookNotificationMessage message = new EntityUpdateRequestV2(AtlasHook.getUser(), entities); + HookNotificationMessage message = new EntityCreateRequestV2(AtlasHook.getUser(), entities); - AtlasHook.notifyEntities(Arrays.asList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3)); + AtlasHook.notifyEntities(Collections.singletonList(message), atlasProperties.getInt(HOOK_NUM_RETRIES, 3)); } catch(Exception e) { + LOG.error("SqoopHook.publish() failed", e); + throw new AtlasHookException("SqoopHook.publish() failed.", e); } } - private AtlasEntity createHiveDatabaseInstance(String clusterName, String dbName) { + private AtlasEntity toHiveDatabaseEntity(String clusterName, String dbName) { AtlasEntity entHiveDb = new AtlasEntity(HiveDataTypes.HIVE_DB.getName()); String qualifiedName = HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName); @@ -108,7 +113,7 @@ public class SqoopHook extends SqoopJobDataPublisher { return entHiveDb; } - private AtlasEntity createHiveTableInstance(AtlasEntity entHiveDb, String tableName) { + private AtlasEntity toHiveTableEntity(AtlasEntity entHiveDb, String tableName) { AtlasEntity entHiveTable = new AtlasEntity(HiveDataTypes.HIVE_TABLE.getName()); String qualifiedName = HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE), (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName); @@ -119,7 +124,7 @@ public class SqoopHook extends SqoopJobDataPublisher { return entHiveTable; } - private AtlasEntity createDBStoreInstance(SqoopJobDataPublisher.Data data) throws ImportException { + private AtlasEntity toSqoopDBStoreEntity(SqoopJobDataPublisher.Data data) throws ImportException { String table = data.getStoreTable(); String query = data.getStoreQuery(); @@ -145,7 +150,7 @@ public class SqoopHook extends SqoopJobDataPublisher { return entDbStore; } - private AtlasEntity createSqoopProcessInstance(AtlasEntity entDbStore, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) { + private AtlasEntity toSqoopProcessEntity(AtlasEntity entDbStore, AtlasEntity entHiveDb, AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) { AtlasEntity entProcess = new AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName()); String sqoopProcessName = getSqoopProcessName(data, clusterName); Map<String, String> sqoopOptionsMap = new HashMap<>(); @@ -159,12 +164,15 @@ public class SqoopHook extends SqoopJobDataPublisher { entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName); entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation()); + List<AtlasObjectId> sqoopObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entDbStore)); + List<AtlasObjectId> hiveObjects = Collections.singletonList(AtlasTypeUtil.getAtlasObjectId(entHiveTable != null ? entHiveTable : entHiveDb)); + if (isImportOperation(data)) { - entProcess.setAttribute(SqoopHook.INPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entDbStore))); - entProcess.setAttribute(SqoopHook.OUTPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entHiveTable))); + entProcess.setAttribute(SqoopHook.INPUTS, sqoopObjects); + entProcess.setAttribute(SqoopHook.OUTPUTS, hiveObjects); } else { - entProcess.setAttribute(SqoopHook.INPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entHiveTable))); - entProcess.setAttribute(SqoopHook.OUTPUTS, Arrays.asList(AtlasTypeUtil.getAtlasObjectId(entDbStore))); + entProcess.setAttribute(SqoopHook.INPUTS, hiveObjects); + entProcess.setAttribute(SqoopHook.OUTPUTS, sqoopObjects); } entProcess.setAttribute(SqoopHook.USER, data.getUser()); @@ -182,15 +190,21 @@ public class SqoopHook extends SqoopJobDataPublisher { static String getSqoopProcessName(Data data, String clusterName) { StringBuilder name = new StringBuilder(String.format("sqoop %s --connect %s", data.getOperation(), data.getUrl())); - if (StringUtils.isNotEmpty(data.getStoreTable())) { + if (StringUtils.isNotEmpty(data.getHiveTable())) { name.append(" --table ").append(data.getStoreTable()); + } else { + name.append(" --database ").append(data.getHiveDB()); } if (StringUtils.isNotEmpty(data.getStoreQuery())) { name.append(" --query ").append(data.getStoreQuery()); } - name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName)); + if (data.getHiveTable() != null) { + name.append(String.format(" --hive-%s --hive-database %s --hive-table %s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), data.getHiveTable().toLowerCase(), clusterName)); + } else { + name.append(String.format("--hive-%s --hive-database %s --hive-cluster %s", data.getOperation(), data.getHiveDB(), clusterName)); + } return name.toString(); } @@ -198,8 +212,10 @@ public class SqoopHook extends SqoopJobDataPublisher { static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data) { StringBuilder name = new StringBuilder(String.format("%s --url %s", data.getStoreType(), data.getUrl())); - if (StringUtils.isNotEmpty(data.getStoreTable())) { + if (StringUtils.isNotEmpty(data.getHiveTable())) { name.append(" --table ").append(data.getStoreTable()); + } else { + name.append(" --database ").append(data.getHiveDB()); } if (StringUtils.isNotEmpty(data.getStoreQuery())) {
