ATLAS-1092 Add Table.CreateTime to process qualified Name for all hive_process 
(sumasai via shwethags)

(cherry picked from commit d1940ba75df69cada0dea11708fc9052071bd41e)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/971c8cba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/971c8cba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/971c8cba

Branch: refs/heads/0.7-incubating
Commit: 971c8cba2bbc3df5648ee1a77ff8d95905a3c6a2
Parents: 5b67cb6
Author: Shwetha GS <[email protected]>
Authored: Fri Aug 5 11:50:25 2016 +0530
Committer: Madhan Neethiraj <[email protected]>
Committed: Thu Dec 22 14:47:45 2016 -0800

----------------------------------------------------------------------
 .../atlas/hive/bridge/HiveMetaStoreBridge.java  |  2 +-
 .../org/apache/atlas/hive/hook/HiveHook.java    | 30 +++++++++++++++-----
 .../org/apache/atlas/hive/hook/HiveHookIT.java  | 26 ++++++++++++-----
 release-log.txt                                 |  1 +
 4 files changed, 44 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/971c8cba/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
index 270ecf4..eb08c37 100755
--- 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
+++ 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java
@@ -408,7 +408,7 @@ public class HiveMetaStoreBridge {
         return createOrUpdateTableInstance(dbReference, null, hiveTable);
     }
 
-    private static Date getTableCreatedTime(Table table) {
+    public static Date getTableCreatedTime(Table table) {
         return new Date(table.getTTable().getCreateTime() * 
MILLIS_CONVERT_FACTOR);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/971c8cba/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java 
b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
index 40e8c5f..7905bcf 100755
--- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
+++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java
@@ -786,9 +786,9 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
             LOG.debug("Ignoring HDFS paths in qualifiedName for {} {} ", op, 
eventContext.getQueryStr());
         }
 
-        addInputs(op, sortedHiveInputs, buffer, hiveInputsMap, 
ignoreHDFSPathsinQFName);
+        addInputs(dgiBridge, op, sortedHiveInputs, buffer, hiveInputsMap, 
ignoreHDFSPathsinQFName);
         buffer.append(IO_SEP);
-        addOutputs(op, sortedHiveOutputs, buffer, hiveOutputsMap, 
ignoreHDFSPathsinQFName);
+        addOutputs(dgiBridge, op, sortedHiveOutputs, buffer, hiveOutputsMap, 
ignoreHDFSPathsinQFName);
         LOG.info("Setting process qualified name to {}", buffer);
         return buffer.toString();
     }
@@ -815,7 +815,7 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         return false;
     }
 
-    private static void addInputs(HiveOperation op, SortedSet<ReadEntity> 
sortedInputs, StringBuilder buffer, final Map<ReadEntity, Referenceable> refs, 
final boolean ignoreHDFSPathsInQFName) {
+    private static void addInputs(HiveMetaStoreBridge hiveBridge, 
HiveOperation op, SortedSet<ReadEntity> sortedInputs, StringBuilder buffer, 
final Map<ReadEntity, Referenceable> refs, final boolean 
ignoreHDFSPathsInQFName) throws HiveException {
         if (refs != null) {
             if (sortedInputs != null) {
                 Set<String> dataSetsProcessed = new LinkedHashSet<>();
@@ -827,7 +827,12 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
                             (Type.DFS_DIR.equals(input.getType()) || 
Type.LOCAL_DIR.equals(input.getType()))) {
                             LOG.debug("Skipping dfs dir input addition to 
process qualified name {} ", input.getName());
                         } else if (refs.containsKey(input)) {
-                            addDataset(buffer, refs.get(input));
+                            if ( input.getType() == Type.PARTITION || 
input.getType() == Type.TABLE) {
+                                final Date createTime = 
HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(input.getTable().getDbName(),
 input.getTable().getTableName()));
+                                addDataset(buffer, refs.get(input), 
createTime.getTime());
+                            } else {
+                                addDataset(buffer, refs.get(input));
+                            }
                         }
                         dataSetsProcessed.add(input.getName().toLowerCase());
                     }
@@ -837,6 +842,12 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         }
     }
 
+    private static void addDataset(StringBuilder buffer, Referenceable ref, 
final long createTime) {
+        addDataset(buffer, ref);
+        buffer.append(SEP);
+        buffer.append(createTime);
+    }
+
     private static void addDataset(StringBuilder buffer, Referenceable ref) {
         buffer.append(SEP);
         String dataSetQlfdName = (String) 
ref.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME);
@@ -844,11 +855,11 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
         buffer.append(dataSetQlfdName.toLowerCase().replaceAll("/", ""));
     }
 
-    private static void addOutputs(HiveOperation op, SortedSet<WriteEntity> 
sortedOutputs, StringBuilder buffer, final Map<WriteEntity, Referenceable> 
refs, final boolean ignoreHDFSPathsInQFName) {
+    private static void addOutputs(HiveMetaStoreBridge hiveBridge, 
HiveOperation op, SortedSet<WriteEntity> sortedOutputs, StringBuilder buffer, 
final Map<WriteEntity, Referenceable> refs, final boolean 
ignoreHDFSPathsInQFName) throws HiveException {
         if (refs != null) {
             Set<String> dataSetsProcessed = new LinkedHashSet<>();
             if (sortedOutputs != null) {
-                for (Entity output : sortedOutputs) {
+                for (WriteEntity output : sortedOutputs) {
                     final Entity entity = output;
                     if 
(!dataSetsProcessed.contains(output.getName().toLowerCase())) {
                         //HiveOperation.QUERY type encompasses INSERT, 
INSERT_OVERWRITE, UPDATE, DELETE, PATH_WRITE operations
@@ -860,7 +871,12 @@ public class HiveHook extends AtlasHook implements 
ExecuteWithHookContext {
                             (Type.DFS_DIR.equals(output.getType()) || 
Type.LOCAL_DIR.equals(output.getType()))) {
                             LOG.debug("Skipping dfs dir output addition to 
process qualified name {} ", output.getName());
                         } else if (refs.containsKey(output)) {
-                            addDataset(buffer, refs.get(output));
+                            if ( output.getType() == Type.PARTITION || 
output.getType() == Type.TABLE) {
+                                final Date createTime = 
HiveMetaStoreBridge.getTableCreatedTime(hiveBridge.hiveClient.getTable(output.getTable().getDbName(),
 output.getTable().getTableName()));
+                                addDataset(buffer, refs.get(output), 
createTime.getTime());
+                            } else {
+                                addDataset(buffer, refs.get(output));
+                            }
                         }
                         dataSetsProcessed.add(output.getName().toLowerCase());
                     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/971c8cba/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
----------------------------------------------------------------------
diff --git 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
index e61e916..9258b3e 100755
--- 
a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
+++ 
b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java
@@ -544,13 +544,25 @@ public class HiveHookIT extends HiveITBase {
         Referenceable processRef1 = validateProcess(event, expectedInputs, 
outputs);
 
         //Test sorting of tbl names
-        SortedSet<String> sortedTblNames = new TreeSet<>();
-        sortedTblNames.add(getQualifiedTblName(inputTable1Name));
-        sortedTblNames.add(getQualifiedTblName(inputTable2Name));
-
-        //Verify sorted orer of inputs in qualified name
-        Assert.assertEquals(Joiner.on(SEP).join("QUERY", 
sortedTblNames.first(), sortedTblNames.last()) + IO_SEP + SEP + 
Joiner.on(SEP).join(WriteEntity.WriteType.INSERT.name(), 
getQualifiedTblName(insertTableName))
-            , processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME));
+        SortedSet<String> sortedTblNames = new TreeSet<String>();
+        sortedTblNames.add(inputTable1Name.toLowerCase());
+        sortedTblNames.add(inputTable2Name.toLowerCase());
+
+        //Verify sorted order of inputs in qualified name
+        Assert.assertEquals(
+            processRef1.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME),
+
+            Joiner.on(SEP).join("QUERY",
+                getQualifiedTblName(sortedTblNames.first()),
+                
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB,
 sortedTblNames.first())).getTime(),
+                getQualifiedTblName(sortedTblNames.last()),
+                
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB,
 sortedTblNames.last())).getTime())
+                + IO_SEP + SEP
+                + Joiner.on(SEP).
+                join(WriteEntity.WriteType.INSERT.name(),
+                    getQualifiedTblName(insertTableName),
+                    
HiveMetaStoreBridge.getTableCreatedTime(hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB,
 insertTableName)).getTime())
+            );
 
         //Rerun same query. Should result in same process
         runCommandWithDelay(query, 1000);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/971c8cba/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 82d3405..6ce270f 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -27,6 +27,7 @@ ATLAS-409 Atlas will not import avro tables with schema read 
from a file (dosset
 ATLAS-379 Create sqoop and falcon metadata addons 
(venkatnrangan,bvellanki,sowmyaramesh via shwethags)
 
 ALL CHANGES:
+ATLAS-1092 Add Table.CreateTime to process qualified Name for all hive_process 
(sumasai via shwethags)
 ATLAS-1096 Modify HveMetaStoreBridge.import to use getEntity instead of DSL 
(sumasai via shwethags)
 ATLAS-1091 Improvement in DSL search functionality. (kevalbhatt)
 ATLAS-1080 Regression - UI - hive_storagedesc is shown as "undefined" in 
UI.(kevalbhatt)

Reply via email to