Repository: incubator-atlas Updated Branches: refs/heads/master cc503701d -> 384c33585
Atlas changes to support Hive hook for Hive2 Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/384c3358 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/384c3358 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/384c3358 Branch: refs/heads/master Commit: 384c33585e549dbd9f729e3bb61c6fda2448016d Parents: cc50370 Author: Madhan Neethiraj <[email protected]> Authored: Thu Jan 12 12:20:15 2017 +0530 Committer: Vimal Sharma <[email protected]> Committed: Thu Jan 12 12:21:23 2017 +0530 ---------------------------------------------------------------------- .../atlas/hive/bridge/ColumnLineageUtils.java | 54 +++++++++++++++++--- 1 file changed, 47 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/384c3358/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java index c54fdb3..663fcdc 100644 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/ColumnLineageUtils.java @@ -25,7 +25,10 @@ import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,18 +65,55 @@ public class ColumnLineageUtils { for (Map.Entry<LineageInfo.DependencyKey, LineageInfo.Dependency> e : lInfo.entrySet()) { List<HiveColumnLineageInfo> l = new ArrayList<>(); String k = getQualifiedName(e.getKey()); - for (LineageInfo.BaseColumnInfo iCol : e.getValue().getBaseCols()) { - String db = iCol.getTabAlias().getTable().getDbName(); - String table = iCol.getTabAlias().getTable().getTableName(); - String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName(); - l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName)); + + if (LOG.isDebugEnabled()) { + LOG.debug("buildLineageMap(): key={}; value={}", e.getKey(), e.getValue()); + } + + Collection<LineageInfo.BaseColumnInfo> baseCols = getBaseCols(e.getValue()); + + if (baseCols != null) { + for (LineageInfo.BaseColumnInfo iCol : baseCols) { + String db = iCol.getTabAlias().getTable().getDbName(); + String table = iCol.getTabAlias().getTable().getTableName(); + String colQualifiedName = iCol.getColumn() == null ? db + "." + table : db + "." + table + "." + iCol.getColumn().getName(); + l.add(new HiveColumnLineageInfo(e.getValue(), colQualifiedName)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k); + } + m.put(k, l); } - LOG.debug("Setting lineage --> Input: {} ==> Output : {}", l, k); - m.put(k, l); } return m; } + static Collection<LineageInfo.BaseColumnInfo> getBaseCols(LineageInfo.Dependency lInfoDep) { + Collection<LineageInfo.BaseColumnInfo> ret = null; + + if (lInfoDep != null) { + try { + Method getBaseColsMethod = lInfoDep.getClass().getMethod("getBaseCols"); + + Object retGetBaseCols = getBaseColsMethod.invoke(lInfoDep); + + if (retGetBaseCols != null) { + if (retGetBaseCols instanceof Collection) { + ret = (Collection) retGetBaseCols; + } else { + LOG.warn("{}: unexpected return type from LineageInfo.Dependency.getBaseCols(), expected type {}", + retGetBaseCols.getClass().getName(), "Collection"); + } + } + } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException ex) { + LOG.warn("getBaseCols()", ex); + } + } + + return ret; + } + static String[] extractComponents(String qualifiedName) { String[] comps = qualifiedName.split("\\."); int lastIdx = comps.length - 1;
