Repository: atlas
Updated Branches:
  refs/heads/master 9cd28bbed -> 5618ad4ce


ATLAS-2439: updated Sqoop hook to use V2 notifications

Signed-off-by: Madhan Neethiraj <mad...@apache.org>
(cherry picked from commit e8908dbfe1d4dfe641cc7a802625f396aa0a399d)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/5618ad4c
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/5618ad4c
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/5618ad4c

Branch: refs/heads/master
Commit: 5618ad4ce3b608a5c6ab98dc794d231fb7a3673e
Parents: 9cd28bb
Author: rdsolani <rdsol...@gmail.com>
Authored: Thu Feb 1 19:41:33 2018 +0530
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Tue Feb 13 14:31:29 2018 -0800

----------------------------------------------------------------------
 .../org/apache/atlas/sqoop/hook/SqoopHook.java  | 212 ++++++++++---------
 1 file changed, 113 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/5618ad4c/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
----------------------------------------------------------------------
diff --git 
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java 
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
index 5ded92c..4ca3280 100644
--- 
a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
+++ 
b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java
@@ -26,10 +26,12 @@ import org.apache.atlas.hive.bridge.HiveMetaStoreBridge;
 import org.apache.atlas.hive.model.HiveDataTypes;
 import org.apache.atlas.hook.AtlasHook;
 import org.apache.atlas.hook.AtlasHookException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.notification.HookNotification;
-import org.apache.atlas.v1.model.instance.Referenceable;
-import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
+import 
org.apache.atlas.model.notification.HookNotification.EntityUpdateRequestV2;
 import org.apache.atlas.sqoop.model.SqoopDataTypes;
+import org.apache.atlas.type.AtlasTypeUtil;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sqoop.SqoopJobDataPublisher;
@@ -47,151 +49,163 @@ import java.util.Properties;
  * AtlasHook sends lineage information to the AtlasSever.
  */
 public class SqoopHook extends SqoopJobDataPublisher {
-
     private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class);
-    public static final String CONF_PREFIX = "atlas.hook.sqoop.";
-    public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries";
 
-    public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name";
+    public static final String CONF_PREFIX          = "atlas.hook.sqoop.";
+    public static final String HOOK_NUM_RETRIES     = CONF_PREFIX + 
"numRetries";
+    public static final String ATLAS_CLUSTER_NAME   = "atlas.cluster.name";
     public static final String DEFAULT_CLUSTER_NAME = "primary";
 
-    public static final String USER = "userName";
-    public static final String DB_STORE_TYPE = "dbStoreType";
+    public static final String USER           = "userName";
+    public static final String DB_STORE_TYPE  = "dbStoreType";
     public static final String DB_STORE_USAGE = "storeUse";
-    public static final String SOURCE = "source";
-    public static final String DESCRIPTION = "description";
-    public static final String STORE_URI = "storeUri";
-    public static final String OPERATION = "operation";
-    public static final String START_TIME = "startTime";
-    public static final String END_TIME = "endTime";
-    public static final String CMD_LINE_OPTS = "commandlineOpts";
-    // multiple inputs and outputs for process
-    public static final String INPUTS = "inputs";
-    public static final String OUTPUTS = "outputs";
+    public static final String SOURCE         = "source";
+    public static final String DESCRIPTION    = "description";
+    public static final String STORE_URI      = "storeUri";
+    public static final String OPERATION      = "operation";
+    public static final String START_TIME     = "startTime";
+    public static final String END_TIME       = "endTime";
+    public static final String CMD_LINE_OPTS  = "commandlineOpts";
+    public static final String INPUTS         = "inputs";
+    public static final String OUTPUTS        = "outputs";
 
     static {
         
org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml");
     }
 
-    public Referenceable createHiveDatabaseInstance(String clusterName, String 
dbName) {
-        Referenceable dbRef = new 
Referenceable(HiveDataTypes.HIVE_DB.getName());
-        dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName);
-        dbRef.set(AtlasClient.NAME, dbName);
-        dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName));
-        return dbRef;
-    }
+    @Override
+    public void publish(SqoopJobDataPublisher.Data data) throws 
AtlasHookException {
+        try {
+            Configuration atlasProperties = ApplicationProperties.get();
+            String        clusterName     = 
atlasProperties.getString(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+            AtlasEntity   entDbStore      = createDBStoreInstance(data);
+            AtlasEntity   entHiveDb       = 
createHiveDatabaseInstance(clusterName, data.getHiveDB());
+            AtlasEntity   entHiveTable    = createHiveTableInstance(entHiveDb, 
data.getHiveTable());
+            AtlasEntity   entProcess      = 
createSqoopProcessInstance(entDbStore, entHiveTable, data, clusterName);
+
+            AtlasEntitiesWithExtInfo entities = new 
AtlasEntitiesWithExtInfo(entProcess);
 
-    public Referenceable createHiveTableInstance(String clusterName, 
Referenceable dbRef,
-                                                 String tableName, String 
dbName) {
-            Referenceable tableRef = new 
Referenceable(HiveDataTypes.HIVE_TABLE.getName());
-            tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
-                    HiveMetaStoreBridge.getTableQualifiedName(clusterName, 
dbName, tableName));
-            tableRef.set(AtlasClient.NAME, tableName.toLowerCase());
-            tableRef.set(HiveMetaStoreBridge.DB, dbRef);
-            return tableRef;
+            entities.addReferredEntity(entDbStore);
+            entities.addReferredEntity(entHiveDb);
+            entities.addReferredEntity(entHiveTable);
+
+            HookNotification message  = new 
EntityUpdateRequestV2(AtlasHook.getUser(), entities);
+
+            AtlasHook.notifyEntities(Arrays.asList(message), 
atlasProperties.getInt(HOOK_NUM_RETRIES, 3));
+        } catch(Exception e) {
+            throw new AtlasHookException("SqoopHook.publish() failed.", e);
         }
+    }
 
-    private Referenceable createDBStoreInstance(SqoopJobDataPublisher.Data 
data)
-            throws ImportException {
+    private AtlasEntity createHiveDatabaseInstance(String clusterName, String 
dbName) {
+        AtlasEntity entHiveDb     = new 
AtlasEntity(HiveDataTypes.HIVE_DB.getName());
+        String      qualifiedName = 
HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName);
+
+        entHiveDb.setAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, 
clusterName);
+        entHiveDb.setAttribute(AtlasClient.NAME, dbName);
+        entHiveDb.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
qualifiedName);
+
+        return entHiveDb;
+    }
 
-        Referenceable storeRef = new 
Referenceable(SqoopDataTypes.SQOOP_DBDATASTORE.getName());
+    private AtlasEntity createHiveTableInstance(AtlasEntity entHiveDb, String 
tableName) {
+        AtlasEntity entHiveTable  = new 
AtlasEntity(HiveDataTypes.HIVE_TABLE.getName());
+        String      qualifiedName = 
HiveMetaStoreBridge.getTableQualifiedName((String)entHiveDb.getAttribute(AtlasConstants.CLUSTER_NAME_ATTRIBUTE),
 (String)entHiveDb.getAttribute(AtlasClient.NAME), tableName);
+
+        entHiveTable.setAttribute(AtlasClient.NAME, tableName.toLowerCase());
+        entHiveTable.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
qualifiedName);
+        entHiveTable.setAttribute(HiveMetaStoreBridge.DB, 
AtlasTypeUtil.getAtlasObjectId(entHiveDb));
+
+        return entHiveTable;
+    }
+
+    private AtlasEntity createDBStoreInstance(SqoopJobDataPublisher.Data data) 
throws ImportException {
         String table = data.getStoreTable();
         String query = data.getStoreQuery();
+
         if (StringUtils.isBlank(table) && StringUtils.isBlank(query)) {
             throw new ImportException("Both table and query cannot be empty 
for DBStoreInstance");
         }
 
-        String usage = table != null ? "TABLE" : "QUERY";
+        String usage  = table != null ? "TABLE" : "QUERY";
         String source = table != null ? table : query;
-        String name = getSqoopDBStoreName(data);
-        storeRef.set(AtlasClient.NAME, name);
-        storeRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
-        storeRef.set(SqoopHook.DB_STORE_TYPE, data.getStoreType());
-        storeRef.set(SqoopHook.DB_STORE_USAGE, usage);
-        storeRef.set(SqoopHook.STORE_URI, data.getUrl());
-        storeRef.set(SqoopHook.SOURCE, source);
-        storeRef.set(SqoopHook.DESCRIPTION, "");
-        storeRef.set(AtlasClient.OWNER, data.getUser());
-        return storeRef;
+        String name   = getSqoopDBStoreName(data);
+
+        AtlasEntity entDbStore = new 
AtlasEntity(SqoopDataTypes.SQOOP_DBDATASTORE.getName());
+
+        entDbStore.setAttribute(AtlasClient.NAME, name);
+        entDbStore.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
name);
+        entDbStore.setAttribute(SqoopHook.DB_STORE_TYPE, data.getStoreType());
+        entDbStore.setAttribute(SqoopHook.DB_STORE_USAGE, usage);
+        entDbStore.setAttribute(SqoopHook.STORE_URI, data.getUrl());
+        entDbStore.setAttribute(SqoopHook.SOURCE, source);
+        entDbStore.setAttribute(SqoopHook.DESCRIPTION, "");
+        entDbStore.setAttribute(AtlasClient.OWNER, data.getUser());
+
+        return entDbStore;
     }
 
-    private Referenceable createSqoopProcessInstance(Referenceable dbStoreRef, 
Referenceable hiveTableRef,
-                                                     
SqoopJobDataPublisher.Data data, String clusterName) {
-        Referenceable procRef = new 
Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName());
-        final String sqoopProcessName = getSqoopProcessName(data, clusterName);
-        procRef.set(AtlasClient.NAME, sqoopProcessName);
-        procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
sqoopProcessName);
-        procRef.set(SqoopHook.OPERATION, data.getOperation());
-        if (isImportOperation(data)) {
-            procRef.set(SqoopHook.INPUTS, dbStoreRef);
-            procRef.set(SqoopHook.OUTPUTS, hiveTableRef);
-        } else {
-            procRef.set(SqoopHook.INPUTS, hiveTableRef);
-            procRef.set(SqoopHook.OUTPUTS, dbStoreRef);
-        }
-        procRef.set(SqoopHook.USER, data.getUser());
-        procRef.set(SqoopHook.START_TIME, new Date(data.getStartTime()));
-        procRef.set(SqoopHook.END_TIME, new Date(data.getEndTime()));
+    private AtlasEntity createSqoopProcessInstance(AtlasEntity entDbStore, 
AtlasEntity entHiveTable, SqoopJobDataPublisher.Data data, String clusterName) {
+        AtlasEntity         entProcess       = new 
AtlasEntity(SqoopDataTypes.SQOOP_PROCESS.getName());
+        String              sqoopProcessName = getSqoopProcessName(data, 
clusterName);
+        Map<String, String> sqoopOptionsMap  = new HashMap<>();
+        Properties          options          = data.getOptions();
 
-        Map<String, String> sqoopOptionsMap = new HashMap<>();
-        Properties options = data.getOptions();
         for (Object k : options.keySet()) {
             sqoopOptionsMap.put((String)k, (String) options.get(k));
         }
-        procRef.set(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap);
 
-        return procRef;
+        entProcess.setAttribute(AtlasClient.NAME, sqoopProcessName);
+        entProcess.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, 
sqoopProcessName);
+        entProcess.setAttribute(SqoopHook.OPERATION, data.getOperation());
+
+        if (isImportOperation(data)) {
+            entProcess.setAttribute(SqoopHook.INPUTS, 
AtlasTypeUtil.getAtlasObjectId(entDbStore));
+            entProcess.setAttribute(SqoopHook.OUTPUTS, 
AtlasTypeUtil.getAtlasObjectId(entHiveTable));
+        } else {
+            entProcess.setAttribute(SqoopHook.INPUTS, 
AtlasTypeUtil.getAtlasObjectId(entHiveTable));
+            entProcess.setAttribute(SqoopHook.OUTPUTS, 
AtlasTypeUtil.getAtlasObjectId(entDbStore));
+        }
+
+        entProcess.setAttribute(SqoopHook.USER, data.getUser());
+        entProcess.setAttribute(SqoopHook.START_TIME, new 
Date(data.getStartTime()));
+        entProcess.setAttribute(SqoopHook.END_TIME, new 
Date(data.getEndTime()));
+        entProcess.setAttribute(SqoopHook.CMD_LINE_OPTS, sqoopOptionsMap);
+
+        return entProcess;
+    }
+
+    private boolean isImportOperation(SqoopJobDataPublisher.Data data) {
+        return data.getOperation().toLowerCase().equals("import");
     }
 
     static String getSqoopProcessName(Data data, String clusterName) {
-        StringBuilder name = new StringBuilder(String.format("sqoop %s 
--connect %s", data.getOperation(),
-                data.getUrl()));
+        StringBuilder name = new StringBuilder(String.format("sqoop %s 
--connect %s", data.getOperation(), data.getUrl()));
+
         if (StringUtils.isNotEmpty(data.getStoreTable())) {
             name.append(" --table ").append(data.getStoreTable());
         }
+
         if (StringUtils.isNotEmpty(data.getStoreQuery())) {
             name.append(" --query ").append(data.getStoreQuery());
         }
-        name.append(String.format(" --hive-%s --hive-database %s --hive-table 
%s --hive-cluster %s",
-                data.getOperation(), data.getHiveDB().toLowerCase(), 
data.getHiveTable().toLowerCase(), clusterName));
+
+        name.append(String.format(" --hive-%s --hive-database %s --hive-table 
%s --hive-cluster %s", data.getOperation(), data.getHiveDB().toLowerCase(), 
data.getHiveTable().toLowerCase(), clusterName));
+
         return name.toString();
     }
 
     static String getSqoopDBStoreName(SqoopJobDataPublisher.Data data)  {
         StringBuilder name = new StringBuilder(String.format("%s --url %s", 
data.getStoreType(), data.getUrl()));
+
         if (StringUtils.isNotEmpty(data.getStoreTable())) {
             name.append(" --table ").append(data.getStoreTable());
         }
+
         if (StringUtils.isNotEmpty(data.getStoreQuery())) {
             name.append(" --query ").append(data.getStoreQuery());
         }
-        return name.toString();
-    }
 
-    static boolean isImportOperation(SqoopJobDataPublisher.Data data) {
-        return data.getOperation().toLowerCase().equals("import");
-    }
-
-    @Override
-    public void publish(SqoopJobDataPublisher.Data data) throws 
AtlasHookException {
-        try {
-            Configuration atlasProperties = ApplicationProperties.get();
-            String clusterName = atlasProperties.getString(ATLAS_CLUSTER_NAME, 
DEFAULT_CLUSTER_NAME);
-
-            Referenceable dbStoreRef = createDBStoreInstance(data);
-            Referenceable dbRef = createHiveDatabaseInstance(clusterName, 
data.getHiveDB());
-            Referenceable hiveTableRef = createHiveTableInstance(clusterName, 
dbRef,
-                    data.getHiveTable(), data.getHiveDB());
-            Referenceable procRef = createSqoopProcessInstance(dbStoreRef, 
hiveTableRef, data, clusterName);
-
-            int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3);
-            HookNotification message =
-                    new EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, 
dbRef, hiveTableRef, procRef);
-            AtlasHook.notifyEntities(Arrays.asList(message), maxRetries);
-        }
-        catch(Exception e) {
-            throw new AtlasHookException("SqoopHook.publish() failed.", e);
-        }
+        return name.toString();
     }
 }

Reply via email to