This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-1.0 by this push:
     new be98213  ATLAS-3056: updated rdbms types to remove use of 
ownedRef/inverseRef
be98213 is described below

commit be98213f5d60d6af7d485782dc077e06e92b0089
Author: Madhan Neethiraj <mad...@apache.org>
AuthorDate: Thu Feb 21 18:16:34 2019 -0800

    ATLAS-3056: updated rdbms types to remove use of ownedRef/inverseRef
    
    (cherry picked from commit f103e1438bbfc0c61912ded3551e22578876fbbe)
    (cherry picked from commit 5d99c40a51d910d3858d4d25e321afad5f12996d)
---
 addons/models/2000-RDBMS/2010-rdbms_model.json     |   6 +-
 .../003-remove-rdbms-legacy-attributes.json        |  90 +++++++++++++
 .../apache/atlas/model/instance/AtlasStruct.java   |   6 +-
 .../bootstrap/AtlasTypeDefStoreInitializer.java    |  25 +++-
 .../graph/v2/AtlasRelationshipDefStoreV2.java      |  11 +-
 .../notification/NotificationHookConsumer.java     |  30 ++++-
 .../preprocessor/EntityPreprocessor.java           |  39 +++++-
 .../preprocessor/PreprocessorContext.java          |  86 +++++++++----
 .../preprocessor/RdbmsPreprocessor.java            | 139 +++++++++++++++++++++
 9 files changed, 384 insertions(+), 48 deletions(-)

diff --git a/addons/models/2000-RDBMS/2010-rdbms_model.json 
b/addons/models/2000-RDBMS/2010-rdbms_model.json
index 386446c..e72e13a 100644
--- a/addons/models/2000-RDBMS/2010-rdbms_model.json
+++ b/addons/models/2000-RDBMS/2010-rdbms_model.json
@@ -21,7 +21,7 @@
                 {
                     "name": "platform",
                     "typeName": "string",
-                    "isOptional": false,
+                    "isOptional": true,
                     "cardinality": "SINGLE",
                     "isUnique": false,
                     "isIndexable": true
@@ -99,7 +99,7 @@
                 {
                     "name": "prodOrOther",
                     "typeName": "string",
-                    "isOptional": false,
+                    "isOptional": true,
                     "cardinality": "SINGLE",
                     "isUnique": false,
                     "isIndexable": true
@@ -259,7 +259,7 @@
                 {
                     "name": "data_type",
                     "typeName": "string",
-                    "isOptional": false,
+                    "isOptional": true,
                     "cardinality": "SINGLE",
                     "isUnique": false,
                     "isIndexable": true
diff --git 
a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json 
b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
new file mode 100644
index 0000000..d087c66
--- /dev/null
+++ b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
@@ -0,0 +1,90 @@
+{
+  "patches": [
+    {
+      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "typeName":        "rdbms_instance_databases",
+      "applyToVersion":  "1.0",
+      "updateToVersion": "1.1",
+      "params": {
+        "relationshipLabel":    "__rdbms_instance.databases",
+        "relationshipCategory": "COMPOSITION"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "typeName":        "rdbms_db_tables",
+      "applyToVersion":  "1.0",
+      "updateToVersion": "1.1",
+      "params": {
+        "relationshipLabel":    "__rdbms_db.tables",
+        "relationshipCategory": "COMPOSITION"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "typeName":        "rdbms_table_columns",
+      "applyToVersion":  "1.0",
+      "updateToVersion": "1.1",
+      "params": {
+        "relationshipLabel":    "__rdbms_table.columns",
+        "relationshipCategory": "COMPOSITION"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "typeName":        "rdbms_table_indexes",
+      "applyToVersion":  "1.0",
+      "updateToVersion": "1.1",
+      "params": {
+        "relationshipLabel":    "__rdbms_table.indexes",
+        "relationshipCategory": "COMPOSITION"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "typeName":        "rdbms_table_foreign_key",
+      "applyToVersion":  "1.0",
+      "updateToVersion": "1.1",
+      "params": {
+        "relationshipLabel":    "__rdbms_table.foreign_keys",
+        "relationshipCategory": "COMPOSITION"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "typeName":        "rdbms_index_columns",
+      "applyToVersion":  "1.0",
+      "updateToVersion": "1.1",
+      "params": {
+        "relationshipLabel": "__rdbms_index.columns"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "typeName":        "rdbms_foreign_key_key_columns",
+      "applyToVersion":  "1.0",
+      "updateToVersion": "1.1",
+      "params": {
+        "relationshipLabel": "__rdbms_foreign_key.key_columns"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "typeName":        "rdbms_foreign_key_table_references",
+      "applyToVersion":  "1.0",
+      "updateToVersion": "1.1",
+      "params": {
+        "relationshipLabel": "__rdbms_foreign_key.references_table"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "typeName":        "rdbms_foreign_key_column_references",
+      "applyToVersion":  "1.0",
+      "updateToVersion": "1.1",
+      "params": {
+        "relationshipLabel": "__rdbms_foreign_key.references_columns"
+      }
+    }
+  ]
+}
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java 
b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
index 18e7407..027b160 100644
--- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
+++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasStruct.java
@@ -147,12 +147,10 @@ public class AtlasStruct implements Serializable {
         }
     }
 
-    public void removeAttribute(String name) {
+    public Object removeAttribute(String name) {
         Map<String, Object> a = this.attributes;
 
-        if (a != null && a.containsKey(name)) {
-            a.remove(name);
-        }
+        return a != null ? a.remove(name) : null;
     }
 
     public StringBuilder toString(StringBuilder sb) {
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index a86282e..d12284e 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -35,6 +35,7 @@ import org.apache.atlas.model.typedef.AtlasEntityDef;
 import org.apache.atlas.model.typedef.AtlasEnumDef;
 import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
+import 
org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
 import org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
@@ -77,8 +78,9 @@ import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 @Service
 public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class);
-    public static final String PATCHES_FOLDER_NAME = "patches";
-    public static final String RELATIONSHIP_LABEL  = "relationshipLabel";
+    public static final String PATCHES_FOLDER_NAME   = "patches";
+    public static final String RELATIONSHIP_LABEL    = "relationshipLabel";
+    public static final String RELATIONSHIP_CATEGORY = "relationshipCategory";
 
     private final AtlasTypeDefStore atlasTypeDefStore;
     private final AtlasTypeRegistry atlasTypeRegistry;
@@ -730,13 +732,19 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
                     AtlasEntityType         end1Type        = 
typeRegistry.getEntityTypeByName(end1Def.getType());
                     AtlasEntityType         end2Type        = 
typeRegistry.getEntityTypeByName(end2Def.getType());
 
-                    String newRelationshipLabel = null;
+                    String               newRelationshipLabel    = null;
+                    RelationshipCategory newRelationshipCategory = null;
 
                     if (patch.getParams() != null) {
-                        Object val = patch.getParams().get(RELATIONSHIP_LABEL);
+                        Object relLabel    = 
patch.getParams().get(RELATIONSHIP_LABEL);
+                        Object relCategory = 
patch.getParams().get(RELATIONSHIP_CATEGORY);
 
-                        if (val != null) {
-                            newRelationshipLabel = val.toString();
+                        if (relLabel != null) {
+                            newRelationshipLabel = relLabel.toString();
+                        }
+
+                        if (relCategory != null) {
+                            newRelationshipCategory = 
RelationshipCategory.valueOf(relCategory.toString());
                         }
                     }
 
@@ -763,6 +771,11 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
                     AtlasEntityDef       updatedEntityDef2 = new 
AtlasEntityDef(end2Type.getEntityDef());
 
                     updatedDef.setRelationshipLabel(newRelationshipLabel);
+
+                    if (newRelationshipCategory != null) {
+                        
updatedDef.setRelationshipCategory(newRelationshipCategory);
+                    }
+
                     updatedDef.getEndDef1().setIsLegacyAttribute(false);
                     updatedDef.getEndDef2().setIsLegacyAttribute(false);
                     updatedDef.setTypeVersion(patch.getUpdateToVersion());
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
index bfee34e..35d0577 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.store.graph.v2;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.AtlasException;
+import org.apache.atlas.RequestContext;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.typedef.AtlasRelationshipDef;
 import 
org.apache.atlas.model.typedef.AtlasRelationshipDef.RelationshipCategory;
@@ -425,9 +426,13 @@ public class AtlasRelationshipDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasRe
         RelationshipCategory newRelationshipCategory      = 
newRelationshipDef.getRelationshipCategory();
 
         if ( !existingRelationshipCategory.equals(newRelationshipCategory)){
-            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE,
-                    
newRelationshipDef.getName(),newRelationshipCategory.name(),
-                    existingRelationshipCategory.name() );
+            if (!RequestContext.get().isInTypePatching()) {
+                throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_CATEGORY_UPDATE,
+                        newRelationshipDef.getName(), 
newRelationshipCategory.name(),
+                        existingRelationshipCategory.name());
+            } else {
+                LOG.warn("RELATIONSHIP UPDATE: relationship category from {} 
to {} for {}", existingRelationshipCategory.name(), 
newRelationshipCategory.name(), newRelationshipDef.getName());
+            }
         }
 
         AtlasRelationshipEndDef existingEnd1 = 
existingRelationshipDef.getEndDef1();
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 53fd117..eab5d51 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN   
              = 
"atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN    
              = 
"atlas.notification.consumer.preprocess.hive_table.prune.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE       
              = "atlas.notification.consumer.preprocess.hive_table.cache.size";
+    public static final String 
CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS         = 
"atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
 
@@ -142,6 +143,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final List<Pattern>                 hiveTablesToIgnore = new 
ArrayList<>();
     private final List<Pattern>                 hiveTablesToPrune  = new 
ArrayList<>();
     private final Map<String, PreprocessAction> hiveTablesCache;
+    private final boolean                       rdbmsTypesRemoveOwnedRefAttrs;
     private final boolean                       preprocessEnabled;
 
     private NotificationInterface notificationInterface;
@@ -212,7 +214,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             hiveTablesCache = Collections.emptyMap();
         }
 
-        preprocessEnabled = !hiveTablesToIgnore.isEmpty() || 
!hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633;
+        rdbmsTypesRemoveOwnedRefAttrs = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
 true);
+        preprocessEnabled             = !hiveTablesToIgnore.isEmpty() || 
!hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || 
rdbmsTypesRemoveOwnedRefAttrs;
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
         LOG.info("{}={}", 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 
skipHiveColumnLineageHive20633InputsThreshold);
@@ -778,7 +781,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             return;
         }
 
-        PreprocessorContext context = new PreprocessorContext(kafkaMsg, 
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache);
+        PreprocessorContext context = new PreprocessorContext(kafkaMsg, 
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, 
rdbmsTypesRemoveOwnedRefAttrs);
 
         if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
             ignoreOrPruneHiveTables(context);
@@ -788,16 +791,35 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             skipHiveColumnLineage(context);
         }
 
+        if (rdbmsTypesRemoveOwnedRefAttrs) {
+            rdbmsTypeRemoveOwnedRefAttrs(context);
+        }
+
         context.moveRegisteredReferredEntities();
     }
 
+    private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
+        List<AtlasEntity> entities = context.getEntities();
+
+        if (entities != null) {
+            for (ListIterator<AtlasEntity> iter = entities.listIterator(); 
iter.hasNext(); ) {
+                AtlasEntity        entity       = iter.next();
+                EntityPreprocessor preprocessor = 
EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName());
+
+                if (preprocessor != null) {
+                    preprocessor.preprocess(entity, context);
+                }
+            }
+        }
+    }
+
     private void ignoreOrPruneHiveTables(PreprocessorContext context) {
         List<AtlasEntity> entities = context.getEntities();
 
         if (entities != null) {
             for (ListIterator<AtlasEntity> iter = entities.listIterator(); 
iter.hasNext(); ) {
                 AtlasEntity        entity       = iter.next();
-                EntityPreprocessor preprocessor = 
EntityPreprocessor.getPreprocessor(entity.getTypeName());
+                EntityPreprocessor preprocessor = 
EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
 
                 if (preprocessor != null) {
                     preprocessor.preprocess(entity, context);
@@ -813,7 +835,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             if (referredEntities != null) {
                 for (Iterator<Map.Entry<String, AtlasEntity>> iter = 
referredEntities.entrySet().iterator(); iter.hasNext(); ) {
                     AtlasEntity        entity       = iter.next().getValue();
-                    EntityPreprocessor preprocessor = 
EntityPreprocessor.getPreprocessor(entity.getTypeName());
+                    EntityPreprocessor preprocessor = 
EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
 
                     if (preprocessor != null) {
                         preprocessor.preprocess(entity, context);
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index bdea14a..7eba27a 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -32,25 +32,38 @@ public abstract class EntityPreprocessor {
     public static final String TYPE_HIVE_PROCESS        = "hive_process";
     public static final String TYPE_HIVE_STORAGEDESC    = "hive_storagedesc";
     public static final String TYPE_HIVE_TABLE          = "hive_table";
+    public static final String TYPE_RDBMS_INSTANCE      = "rdbms_instance";
+    public static final String TYPE_RDBMS_DB            = "rdbms_db";
+    public static final String TYPE_RDBMS_TABLE         = "rdbms_table";
+    public static final String TYPE_RDBMS_COLUMN        = "rdbms_column";
+    public static final String TYPE_RDBMS_INDEX         = "rdbms_index";
+    public static final String TYPE_RDBMS_FOREIGN_KEY   = "rdbms_foreign_key";
 
     public static final String ATTRIBUTE_COLUMNS        = "columns";
     public static final String ATTRIBUTE_INPUTS         = "inputs";
     public static final String ATTRIBUTE_OUTPUTS        = "outputs";
     public static final String ATTRIBUTE_PARTITION_KEYS = "partitionKeys";
     public static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName";
+    public static final String ATTRIBUTE_NAME           = "name";
     public static final String ATTRIBUTE_SD             = "sd";
+    public static final String ATTRIBUTE_DB             = "db";
+    public static final String ATTRIBUTE_DATABASES      = "databases";
+    public static final String ATTRIBUTE_TABLES         = "tables";
+    public static final String ATTRIBUTE_INDEXES        = "indexes";
+    public static final String ATTRIBUTE_FOREIGN_KEYS   = "foreign_keys";
 
     public static final char   QNAME_SEP_CLUSTER_NAME = '@';
     public static final char   QNAME_SEP_ENTITY_NAME  = '.';
     public static final String QNAME_SD_SUFFIX        = "_storage";
 
-    private static final Map<String, EntityPreprocessor> PREPROCESSOR_MAP = 
new HashMap<>();
+    private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP 
 = new HashMap<>();
+    private static final Map<String, EntityPreprocessor> 
RDBMS_PREPROCESSOR_MAP = new HashMap<>();
 
     private final String typeName;
 
 
     static {
-        EntityPreprocessor[] preprocessors = new EntityPreprocessor[] {
+        EntityPreprocessor[] hivePreprocessors = new EntityPreprocessor[] {
                                                                     new 
HivePreprocessor.HiveTablePreprocessor(),
                                                                     new 
HivePreprocessor.HiveColumnPreprocessor(),
                                                                     new 
HivePreprocessor.HiveProcessPreprocessor(),
@@ -58,8 +71,18 @@ public abstract class EntityPreprocessor {
                                                                     new 
HivePreprocessor.HiveStorageDescPreprocessor()
         };
 
-        for (EntityPreprocessor preprocessor : preprocessors) {
-            PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor);
+        EntityPreprocessor[] rdbmsPreprocessors = new EntityPreprocessor[] {
+                                                                    new 
RdbmsPreprocessor.RdbmsInstancePreprocessor(),
+                                                                    new 
RdbmsPreprocessor.RdbmsDbPreprocessor(),
+                                                                    new 
RdbmsPreprocessor.RdbmsTablePreprocessor()
+       };
+
+        for (EntityPreprocessor preprocessor : hivePreprocessors) {
+            HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), 
preprocessor);
+        }
+
+        for (EntityPreprocessor preprocessor : rdbmsPreprocessors) {
+            RDBMS_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), 
preprocessor);
         }
     }
 
@@ -74,8 +97,12 @@ public abstract class EntityPreprocessor {
     public abstract void preprocess(AtlasEntity entity, PreprocessorContext 
context);
 
 
-    public static EntityPreprocessor getPreprocessor(String typeName) {
-        return typeName != null ? PREPROCESSOR_MAP.get(typeName) : null;
+    public static EntityPreprocessor getHivePreprocessor(String typeName) {
+        return typeName != null ? HIVE_PREPROCESSOR_MAP.get(typeName) : null;
+    }
+
+    public static EntityPreprocessor getRdbmsPreprocessor(String typeName) {
+        return typeName != null ? RDBMS_PREPROCESSOR_MAP.get(typeName) : null;
     }
 
     public static String getQualifiedName(AtlasEntity entity) {
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 2d2c09a..0f95fba 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -44,15 +44,17 @@ public class PreprocessorContext {
     private final List<Pattern>                       hiveTablesToIgnore;
     private final List<Pattern>                       hiveTablesToPrune;
     private final Map<String, PreprocessAction>       hiveTablesCache;
+    private final boolean                             
rdbmsTypesRemoveOwnedRefAttrs;
     private final Set<String>                         ignoredEntities        = 
new HashSet<>();
     private final Set<String>                         prunedEntities         = 
new HashSet<>();
     private final Set<String>                         referredEntitiesToMove = 
new HashSet<>();
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotification> 
kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> 
hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache) {
-        this.kafkaMessage       = kafkaMessage;
-        this.hiveTablesToIgnore = hiveTablesToIgnore;
-        this.hiveTablesToPrune  = hiveTablesToPrune;
-        this.hiveTablesCache    = hiveTablesCache;
+    public PreprocessorContext(AtlasKafkaMessage<HookNotification> 
kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> 
hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean 
rdbmsTypesRemoveOwnedRefAttrs) {
+        this.kafkaMessage                  = kafkaMessage;
+        this.hiveTablesToIgnore            = hiveTablesToIgnore;
+        this.hiveTablesToPrune             = hiveTablesToPrune;
+        this.hiveTablesCache               = hiveTablesCache;
+        this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
 
         final HookNotification  message = kafkaMessage.getMessage();
 
@@ -83,6 +85,8 @@ public class PreprocessorContext {
         return kafkaMessage.getPartition();
     }
 
+    public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return 
rdbmsTypesRemoveOwnedRefAttrs; }
+
     public List<AtlasEntity> getEntities() {
         return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() 
: null;
     }
@@ -95,6 +99,12 @@ public class PreprocessorContext {
         return entitiesWithExtInfo != null && guid != null ? 
entitiesWithExtInfo.getEntity(guid) : null;
     }
 
+    public AtlasEntity removeReferredEntity(String guid) {
+        Map<String, AtlasEntity> referredEntities = getReferredEntities();
+
+        return referredEntities != null && guid != null ? 
referredEntities.remove(guid) : null;
+    }
+
     public Set<String> getIgnoredEntities() { return ignoredEntities; }
 
     public Set<String> getPrunedEntities() { return prunedEntities; }
@@ -165,6 +175,14 @@ public class PreprocessorContext {
         }
     }
 
+    public void addToReferredEntitiesToMove(Collection<String> guids) {
+        if (guids != null) {
+            for (String guid : guids) {
+                addToReferredEntitiesToMove(guid);
+            }
+        }
+    }
+
     public void addToIgnoredEntities(Object obj) {
         collectGuids(obj, ignoredEntities);
     }
@@ -173,6 +191,14 @@ public class PreprocessorContext {
         collectGuids(obj, prunedEntities);
     }
 
+    public void removeRefAttributeAndRegisterToMove(AtlasEntity entity, String 
attrName) {
+        Set<String> guidsToMove = new HashSet<>();
+
+        collectGuids(entity.removeAttribute(attrName), guidsToMove);
+
+        addToReferredEntitiesToMove(guidsToMove);
+    }
+
     public void moveRegisteredReferredEntities() {
         List<AtlasEntity>        entities         = getEntities();
         Map<String, AtlasEntity> referredEntities = getReferredEntities();
@@ -202,38 +228,39 @@ public class PreprocessorContext {
         }
     }
 
-    public String getGuid(Object obj) {
+    public String getTypeName(Object obj) {
         Object ret = null;
 
         if (obj instanceof AtlasObjectId) {
-            ret = ((AtlasObjectId) obj).getGuid();
+            ret = ((AtlasObjectId) obj).getTypeName();
         } else if (obj instanceof Map) {
-            ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
+            ret = ((Map) obj).get(AtlasObjectId.KEY_TYPENAME);
         } else if (obj instanceof AtlasEntity) {
-            ret = ((AtlasEntity) obj).getGuid();
+            ret = ((AtlasEntity) obj).getTypeName();
         } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
-            ret = ((AtlasEntity.AtlasEntityWithExtInfo) 
obj).getEntity().getGuid();
+            ret = ((AtlasEntity.AtlasEntityWithExtInfo) 
obj).getEntity().getTypeName();
         }
 
         return ret != null ? ret.toString() : null;
     }
 
+    public String getGuid(Object obj) {
+        Object ret = null;
 
-    private boolean isMatch(String name, List<Pattern> patterns) {
-        boolean ret = false;
-
-        for (Pattern p : patterns) {
-            if (p.matcher(name).matches()) {
-                ret = true;
-
-                break;
-            }
+        if (obj instanceof AtlasObjectId) {
+            ret = ((AtlasObjectId) obj).getGuid();
+        } else if (obj instanceof Map) {
+            ret = ((Map) obj).get(AtlasObjectId.KEY_GUID);
+        } else if (obj instanceof AtlasEntity) {
+            ret = ((AtlasEntity) obj).getGuid();
+        } else if (obj instanceof AtlasEntity.AtlasEntityWithExtInfo) {
+            ret = ((AtlasEntity.AtlasEntityWithExtInfo) 
obj).getEntity().getGuid();
         }
 
-        return ret;
+        return ret != null ? ret.toString() : null;
     }
 
-    private void collectGuids(Object obj, Set<String> guids) {
+    public void collectGuids(Object obj, Set<String> guids) {
         if (obj != null) {
             if (obj instanceof Collection) {
                 Collection objList = (Collection) obj;
@@ -247,11 +274,26 @@ public class PreprocessorContext {
         }
     }
 
-    private void collectGuid(Object obj, Set<String> guids) {
+    public void collectGuid(Object obj, Set<String> guids) {
         String guid = getGuid(obj);
 
         if (guid != null) {
             guids.add(guid);
         }
     }
+
+
+    private boolean isMatch(String name, List<Pattern> patterns) {
+        boolean ret = false;
+
+        for (Pattern p : patterns) {
+            if (p.matcher(name).matches()) {
+                ret = true;
+
+                break;
+            }
+        }
+
+        return ret;
+    }
 }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
new file mode 100644
index 0000000..adc1983
--- /dev/null
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/RdbmsPreprocessor.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.notification.preprocessor;
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class RdbmsPreprocessor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(RdbmsPreprocessor.class);
+
+    static class RdbmsInstancePreprocessor extends RdbmsTypePreprocessor {
+        public RdbmsInstancePreprocessor() {
+            super(TYPE_RDBMS_INSTANCE);
+        }
+    }
+
+    static class RdbmsDbPreprocessor extends RdbmsTypePreprocessor {
+        public RdbmsDbPreprocessor() {
+            super(TYPE_RDBMS_DB);
+        }
+    }
+
+    static class RdbmsTablePreprocessor extends RdbmsTypePreprocessor {
+        public RdbmsTablePreprocessor() {
+            super(TYPE_RDBMS_TABLE);
+        }
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext 
context) {
+            super.preprocess(entity, context);
+
+            // try auto-fix when 'db' attribute is not present in 
relationshipAttribute & attributes
+            Object db = entity.getRelationshipAttribute(ATTRIBUTE_DB);
+
+            if (db == null) {
+                db = entity.getAttribute(ATTRIBUTE_DB);
+            }
+
+            if (db == null) {
+                String dbQualifiedName = getDbQualifiedName(entity);
+
+                if (dbQualifiedName != null) {
+                    AtlasObjectId dbId = new AtlasObjectId(TYPE_RDBMS_DB, 
Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName));
+
+                    LOG.info("missing attribute {}.{} is set to {}", 
TYPE_RDBMS_TABLE, ATTRIBUTE_DB, dbId);
+
+                    entity.setRelationshipAttribute(ATTRIBUTE_DB, dbId);
+                }
+            }
+        }
+
+        private String getDbQualifiedName(AtlasEntity tableEntity) {
+            String ret              = null;
+            Object tblQualifiedName = 
tableEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);  // 
dbName.tblName@clusterName
+            Object tblName          = 
tableEntity.getAttribute(ATTRIBUTE_NAME);  // tblName
+
+            if (tblQualifiedName != null && tblName != null) {
+                ret = tblQualifiedName.toString().replace("." + 
tblName.toString() + "@", "@"); // dbName@clusterName
+            }
+
+            return ret;
+        }
+
+    }
+
+    static class RdbmsTypePreprocessor extends EntityPreprocessor {
+        private static final Set<String> entityTypesToMove = new HashSet<>();
+
+        static {
+            entityTypesToMove.add(TYPE_RDBMS_DB);
+            entityTypesToMove.add(TYPE_RDBMS_TABLE);
+            entityTypesToMove.add(TYPE_RDBMS_COLUMN);
+            entityTypesToMove.add(TYPE_RDBMS_INDEX);
+            entityTypesToMove.add(TYPE_RDBMS_FOREIGN_KEY);
+        }
+
+        protected RdbmsTypePreprocessor(String typeName) {
+            super(typeName);
+        }
+
+        @Override
+        public void preprocess(AtlasEntity entity, PreprocessorContext 
context) {
+            if (context.getRdbmsTypesRemoveOwnedRefAttrs()) {
+                clearRefAttributesAndMove(entity, context);
+
+                Map<String, AtlasEntity> referredEntities = 
context.getReferredEntities();
+
+                if (MapUtils.isNotEmpty(referredEntities)) {
+                    for (AtlasEntity referredEntity : 
referredEntities.values()) {
+                        if 
(entityTypesToMove.contains(referredEntity.getTypeName())) {
+                            clearRefAttributesAndMove(referredEntity, context);
+                        }
+                    }
+                }
+            }
+        }
+
+        private void clearRefAttributesAndMove(AtlasEntity entity, 
PreprocessorContext context) {
+            switch (entity.getTypeName()) {
+                case TYPE_RDBMS_INSTANCE:
+                    context.removeRefAttributeAndRegisterToMove(entity, 
ATTRIBUTE_DATABASES);
+                break;
+
+                case TYPE_RDBMS_DB:
+                    context.removeRefAttributeAndRegisterToMove(entity, 
ATTRIBUTE_TABLES);
+                break;
+
+                case TYPE_RDBMS_TABLE:
+                    context.removeRefAttributeAndRegisterToMove(entity, 
ATTRIBUTE_COLUMNS);
+                    context.removeRefAttributeAndRegisterToMove(entity, 
ATTRIBUTE_INDEXES);
+                    context.removeRefAttributeAndRegisterToMove(entity, 
ATTRIBUTE_FOREIGN_KEYS);
+                break;
+            }
+        }
+    }
+}

Reply via email to