This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 2471525  ATLAS-3067: updated hive types to remove use of 
ownedRef/inverseRef attributes
2471525 is described below

commit 24715256558bed6f89d4bec25c6eada0c9db2075
Author: Madhan Neethiraj <mad...@apache.org>
AuthorDate: Wed Mar 6 23:49:40 2019 -0800

    ATLAS-3067: updated hive types to remove use of ownedRef/inverseRef 
attributes
---
 .../patches/008-remove-hive-legacy-attributes.json | 50 ++++++++++++++++
 .../003-remove-rdbms-legacy-attributes.json        | 54 ++++++++---------
 .../bootstrap/AtlasTypeDefStoreInitializer.java    | 44 ++++++++++----
 .../graph/v2/AtlasRelationshipDefStoreV2.java      | 20 ++++---
 .../notification/NotificationHookConsumer.java     | 47 ++++++++-------
 .../preprocessor/EntityPreprocessor.java           |  2 +
 .../preprocessor/HivePreprocessor.java             | 67 ++++++++++++++++++++++
 .../preprocessor/PreprocessorContext.java          |  6 +-
 8 files changed, 223 insertions(+), 67 deletions(-)

diff --git 
a/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json 
b/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json
new file mode 100644
index 0000000..32a0876
--- /dev/null
+++ b/addons/models/1000-Hadoop/patches/008-remove-hive-legacy-attributes.json
@@ -0,0 +1,50 @@
+{
+  "patches": [
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_table_db",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_table.db"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_table_columns",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_table.columns"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_table_partitionkeys",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_table.partitionkeys"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_table_storagedesc",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_table.sd"
+      }
+    },
+    {
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
+      "typeName":        "hive_process_column_lineage",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
+      "params": {
+        "relationshipLabel": "__hive_column_lineage.query",
+        "swapEnds":          "true"
+      }
+    }
+  ]
+}
diff --git 
a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json 
b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
index d087c66..5531bee 100644
--- a/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
+++ b/addons/models/2000-RDBMS/patches/003-remove-rdbms-legacy-attributes.json
@@ -1,87 +1,87 @@
 {
   "patches": [
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_instance_databases",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_instance.databases",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_db_tables",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_db.tables",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_table_columns",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_table.columns",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_table_indexes",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_table.indexes",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_table_foreign_key",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel":    "__rdbms_table.foreign_keys",
         "relationshipCategory": "COMPOSITION"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_index_columns",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel": "__rdbms_index.columns"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_foreign_key_key_columns",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel": "__rdbms_foreign_key.key_columns"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_foreign_key_table_references",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel": "__rdbms_foreign_key.references_table"
       }
     },
     {
-      "action":          "REMOVE_LEGACY_ATTRIBUTES",
+      "action":          "REMOVE_LEGACY_REF_ATTRIBUTES",
       "typeName":        "rdbms_foreign_key_column_references",
-      "applyToVersion":  "1.0",
-      "updateToVersion": "1.1",
+      "applyToVersion":  "1.1",
+      "updateToVersion": "1.2",
       "params": {
         "relationshipLabel": "__rdbms_foreign_key.references_columns"
       }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index d12284e..66162aa 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -78,9 +78,10 @@ import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 @Service
 public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler {
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasTypeDefStoreInitializer.class);
-    public static final String PATCHES_FOLDER_NAME   = "patches";
-    public static final String RELATIONSHIP_LABEL    = "relationshipLabel";
-    public static final String RELATIONSHIP_CATEGORY = "relationshipCategory";
+    public static final String PATCHES_FOLDER_NAME    = "patches";
+    public static final String RELATIONSHIP_LABEL     = "relationshipLabel";
+    public static final String RELATIONSHIP_CATEGORY  = "relationshipCategory";
+    public static final String RELATIONSHIP_SWAP_ENDS = "swapEnds";
 
     private final AtlasTypeDefStore atlasTypeDefStore;
     private final AtlasTypeRegistry atlasTypeRegistry;
@@ -414,7 +415,7 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
             PatchHandler[] patchHandlers = new PatchHandler[] {
                     new AddAttributePatchHandler(atlasTypeDefStore, 
atlasTypeRegistry),
                     new UpdateAttributePatchHandler(atlasTypeDefStore, 
atlasTypeRegistry),
-                    new RemoveLegacyAttributesPatchHandler(atlasTypeDefStore, 
atlasTypeRegistry),
+                    new 
RemoveLegacyRefAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry),
                     new UpdateTypeDefOptionsPatchHandler(atlasTypeDefStore, 
atlasTypeRegistry),
                     new SetServiceTypePatchHandler(atlasTypeDefStore, 
atlasTypeRegistry)
             };
@@ -710,9 +711,9 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
         }
     }
 
-    class RemoveLegacyAttributesPatchHandler extends PatchHandler {
-        public RemoveLegacyAttributesPatchHandler(AtlasTypeDefStore 
typeDefStore, AtlasTypeRegistry typeRegistry) {
-            super(typeDefStore, typeRegistry, new String[] { 
"REMOVE_LEGACY_ATTRIBUTES" });
+    class RemoveLegacyRefAttributesPatchHandler extends PatchHandler {
+        public RemoveLegacyRefAttributesPatchHandler(AtlasTypeDefStore 
typeDefStore, AtlasTypeRegistry typeRegistry) {
+            super(typeDefStore, typeRegistry, new String[] { 
"REMOVE_LEGACY_REF_ATTRIBUTES" });
         }
 
         @Override
@@ -734,10 +735,12 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
 
                     String               newRelationshipLabel    = null;
                     RelationshipCategory newRelationshipCategory = null;
+                    boolean              swapEnds                = false;
 
                     if (patch.getParams() != null) {
                         Object relLabel    = 
patch.getParams().get(RELATIONSHIP_LABEL);
                         Object relCategory = 
patch.getParams().get(RELATIONSHIP_CATEGORY);
+                        Object relSwapEnds = 
patch.getParams().get(RELATIONSHIP_SWAP_ENDS);
 
                         if (relLabel != null) {
                             newRelationshipLabel = relLabel.toString();
@@ -746,6 +749,10 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
                         if (relCategory != null) {
                             newRelationshipCategory = 
RelationshipCategory.valueOf(relCategory.toString());
                         }
+
+                        if (relSwapEnds != null) {
+                            swapEnds = Boolean.valueOf(relSwapEnds.toString());
+                        }
                     }
 
                     if (StringUtils.isEmpty(newRelationshipLabel)) {
@@ -766,9 +773,19 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
                         }
                     }
 
-                    AtlasRelationshipDef updatedDef        = new 
AtlasRelationshipDef(relationshipDef);
-                    AtlasEntityDef       updatedEntityDef1 = new 
AtlasEntityDef(end1Type.getEntityDef());
-                    AtlasEntityDef       updatedEntityDef2 = new 
AtlasEntityDef(end2Type.getEntityDef());
+                    AtlasRelationshipDef updatedDef = new 
AtlasRelationshipDef(relationshipDef);
+
+                    if (swapEnds) {
+                        AtlasRelationshipEndDef tmp = updatedDef.getEndDef1();
+
+                        updatedDef.setEndDef1(updatedDef.getEndDef2());
+                        updatedDef.setEndDef2(tmp);
+                    }
+
+                    end1Def  = updatedDef.getEndDef1();
+                    end2Def  = updatedDef.getEndDef2();
+                    end1Type = 
typeRegistry.getEntityTypeByName(end1Def.getType());
+                    end2Type = 
typeRegistry.getEntityTypeByName(end2Def.getType());
 
                     updatedDef.setRelationshipLabel(newRelationshipLabel);
 
@@ -776,10 +793,13 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
                         
updatedDef.setRelationshipCategory(newRelationshipCategory);
                     }
 
-                    updatedDef.getEndDef1().setIsLegacyAttribute(false);
-                    updatedDef.getEndDef2().setIsLegacyAttribute(false);
+                    end1Def.setIsLegacyAttribute(false);
+                    end2Def.setIsLegacyAttribute(false);
                     updatedDef.setTypeVersion(patch.getUpdateToVersion());
 
+                    AtlasEntityDef updatedEntityDef1 = new 
AtlasEntityDef(end1Type.getEntityDef());
+                    AtlasEntityDef updatedEntityDef2 = new 
AtlasEntityDef(end2Type.getEntityDef());
+
                     updatedEntityDef1.removeAttribute(end1Def.getName());
                     updatedEntityDef2.removeAttribute(end2Def.getName());
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
index 35d0577..332c18a 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasRelationshipDefStoreV2.java
@@ -436,19 +436,25 @@ public class AtlasRelationshipDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasRe
         }
 
         AtlasRelationshipEndDef existingEnd1 = 
existingRelationshipDef.getEndDef1();
+        AtlasRelationshipEndDef existingEnd2 = 
existingRelationshipDef.getEndDef2();
         AtlasRelationshipEndDef newEnd1      = newRelationshipDef.getEndDef1();
+        AtlasRelationshipEndDef newEnd2      = newRelationshipDef.getEndDef2();
+        boolean                 endsSwaped   = false;
 
         if ( !isValidUpdate(existingEnd1, newEnd1) ) {
-            throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
-                                         newRelationshipDef.getName(), 
newEnd1.toString(), existingEnd1.toString());
+            if (RequestContext.get().isInTypePatching() && 
isValidUpdate(existingEnd1, newEnd2)) { // allow swap of ends during type-patch
+                endsSwaped = true;
+            } else {
+                throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END1_UPDATE,
+                                             newRelationshipDef.getName(), 
newEnd1.toString(), existingEnd1.toString());
+            }
         }
 
-        AtlasRelationshipEndDef existingEnd2 = 
existingRelationshipDef.getEndDef2();
-        AtlasRelationshipEndDef newEnd2      = newRelationshipDef.getEndDef2();
+        AtlasRelationshipEndDef newEndToCompareWith = endsSwaped ? newEnd1 : 
newEnd2;
 
-        if ( !isValidUpdate(existingEnd2, newEnd2) ) {
+        if ( !isValidUpdate(existingEnd2, newEndToCompareWith) ) {
                 throw new 
AtlasBaseException(AtlasErrorCode.RELATIONSHIPDEF_INVALID_END2_UPDATE,
-                                         newRelationshipDef.getName(), 
newEnd2.toString(), existingEnd2.toString());
+                                             newRelationshipDef.getName(), 
newEndToCompareWith.toString(), existingEnd2.toString());
         }
     }
 
@@ -520,7 +526,7 @@ public class AtlasRelationshipDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasRe
     }
 
     private static boolean isValidUpdate(AtlasRelationshipEndDef currentDef, 
AtlasRelationshipEndDef updatedDef) {
-        // permit updates to description and isLegacyAttribute (ref type-patch 
REMOVE_LEGACY_ATTRIBUTES)
+        // permit updates to description and isLegacyAttribute (ref type-patch 
REMOVE_LEGACY_REF_ATTRIBUTES)
         return Objects.equals(currentDef.getType(), updatedDef.getType()) &&
                 Objects.equals(currentDef.getName(), updatedDef.getName()) &&
                 Objects.equals(currentDef.getIsContainer(), 
updatedDef.getIsContainer()) &&
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 50559dd..d16d544 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -78,7 +78,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -87,6 +86,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
+
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
  */
@@ -123,6 +123,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_IGNORE_PATTERN   
              = 
"atlas.notification.consumer.preprocess.hive_table.ignore.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_PRUNE_PATTERN    
              = 
"atlas.notification.consumer.preprocess.hive_table.prune.pattern";
     public static final String CONSUMER_PREPROCESS_HIVE_TABLE_CACHE_SIZE       
              = "atlas.notification.consumer.preprocess.hive_table.cache.size";
+    public static final String 
CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS          = 
"atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs";
     public static final String 
CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS         = 
"atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
@@ -143,6 +144,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final List<Pattern>                 hiveTablesToIgnore = new 
ArrayList<>();
     private final List<Pattern>                 hiveTablesToPrune  = new 
ArrayList<>();
     private final Map<String, PreprocessAction> hiveTablesCache;
+    private final boolean                       hiveTypesRemoveOwnedRefAttrs;
     private final boolean                       rdbmsTypesRemoveOwnedRefAttrs;
     private final boolean                       preprocessEnabled;
 
@@ -172,7 +174,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         consumerRetryInterval = 
applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
         minWaitDuration       = 
applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, 
consumerRetryInterval); // 500 ms  by default
         maxWaitDuration       = 
applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 
60);  //  30 sec by default
-        commitBatchSize       = 
applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
+        commitBatchSize       = 
applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 0);
 
         skipHiveColumnLineageHive20633                = 
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
false);
         skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
@@ -214,11 +216,16 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             hiveTablesCache = Collections.emptyMap();
         }
 
-        rdbmsTypesRemoveOwnedRefAttrs = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
 true);
-        preprocessEnabled             = !hiveTablesToIgnore.isEmpty() || 
!hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || 
rdbmsTypesRemoveOwnedRefAttrs;
+        hiveTypesRemoveOwnedRefAttrs  = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS,
 false);
+        rdbmsTypesRemoveOwnedRefAttrs = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
 false);
+        preprocessEnabled             = !hiveTablesToIgnore.isEmpty() || 
!hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || 
hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs;
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
         LOG.info("{}={}", 
CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 
skipHiveColumnLineageHive20633InputsThreshold);
+        LOG.info("{}={}", 
CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, 
hiveTypesRemoveOwnedRefAttrs);
+        LOG.info("{}={}", 
CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, 
rdbmsTypesRemoveOwnedRefAttrs);
+        LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize);
+        LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled);
     }
 
     @Override
@@ -694,7 +701,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             List<AtlasEntity> entitiesList = entities.getEntities();
             AtlasEntityStream entityStream = new AtlasEntityStream(entities);
 
-            if (entitiesList.size() <= commitBatchSize) {
+            if (commitBatchSize <= 0 || entitiesList.size() <= 
commitBatchSize) {
                 atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
             } else {
                 for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx 
+= commitBatchSize) {
@@ -792,10 +799,10 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             return;
         }
 
-        PreprocessorContext context = new PreprocessorContext(kafkaMsg, 
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, 
rdbmsTypesRemoveOwnedRefAttrs);
+        PreprocessorContext context = new PreprocessorContext(kafkaMsg, 
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, 
hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
 
-        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty()) {
-            ignoreOrPruneHiveTables(context);
+        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || 
hiveTypesRemoveOwnedRefAttrs) {
+            preprocessHiveTypes(context);
         }
 
         if (skipHiveColumnLineageHive20633) {
@@ -813,8 +820,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         List<AtlasEntity> entities = context.getEntities();
 
         if (entities != null) {
-            for (ListIterator<AtlasEntity> iter = entities.listIterator(); 
iter.hasNext(); ) {
-                AtlasEntity        entity       = iter.next();
+            for (int i = 0; i < entities.size(); i++) {
+                AtlasEntity        entity       = entities.get(i);
                 EntityPreprocessor preprocessor = 
EntityPreprocessor.getRdbmsPreprocessor(entity.getTypeName());
 
                 if (preprocessor != null) {
@@ -824,19 +831,19 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
     }
 
-    private void ignoreOrPruneHiveTables(PreprocessorContext context) {
+    private void preprocessHiveTypes(PreprocessorContext context) {
         List<AtlasEntity> entities = context.getEntities();
 
         if (entities != null) {
-            for (ListIterator<AtlasEntity> iter = entities.listIterator(); 
iter.hasNext(); ) {
-                AtlasEntity        entity       = iter.next();
+            for (int i = 0; i < entities.size(); i++) {
+                AtlasEntity        entity       = entities.get(i);
                 EntityPreprocessor preprocessor = 
EntityPreprocessor.getHivePreprocessor(entity.getTypeName());
 
                 if (preprocessor != null) {
                     preprocessor.preprocess(entity, context);
 
                     if (context.isIgnoredEntity(entity.getGuid())) {
-                        iter.remove();
+                        entities.remove(i--);
                     }
                 }
             }
@@ -877,8 +884,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             Set<String> lineageQNames      = new HashSet<>();
 
             // find if all hive_column_lineage entities have same number of 
inputs, which is likely to be caused by HIVE-20633 that results in incorrect 
lineage in some cases
-            for (ListIterator<AtlasEntity> iter = entities.listIterator(); 
iter.hasNext(); ) {
-                AtlasEntity entity = iter.next();
+            for (int i = 0; i < entities.size(); i++) {
+                AtlasEntity entity = entities.get(i);
 
                 if (StringUtils.equals(entity.getTypeName(), 
TYPE_HIVE_COLUMN_LINEAGE)) {
                     final Object qName = 
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
@@ -887,7 +894,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                         final String qualifiedName = qName.toString();
 
                         if (lineageQNames.contains(qualifiedName)) {
-                            iter.remove();
+                            entities.remove(i--);
 
                             LOG.warn("removed duplicate hive_column_lineage 
entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, 
lineageInputsCount, context.getKafkaMessageOffset(), 
context.getKafkaPartition());
 
@@ -914,11 +921,11 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             float avgInputsCount = lineageCount > 0 ? (((float) 
lineageInputsCount) / lineageCount) : 0;
 
             if (avgInputsCount > 
skipHiveColumnLineageHive20633InputsThreshold) {
-                for (ListIterator<AtlasEntity> iter = entities.listIterator(); 
iter.hasNext(); ) {
-                    AtlasEntity entity = iter.next();
+                for (int i = 0; i < entities.size(); i++) {
+                    AtlasEntity entity = entities.get(i);
 
                     if (StringUtils.equals(entity.getTypeName(), 
TYPE_HIVE_COLUMN_LINEAGE)) {
-                        iter.remove();
+                        entities.remove(i--);
 
                         numRemovedEntities++;
                     }
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
index 7eba27a..085e746 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java
@@ -48,6 +48,8 @@ public abstract class EntityPreprocessor {
     public static final String ATTRIBUTE_SD             = "sd";
     public static final String ATTRIBUTE_DB             = "db";
     public static final String ATTRIBUTE_DATABASES      = "databases";
+    public static final String ATTRIBUTE_QUERY          = "query";
+    public static final String ATTRIBUTE_TABLE          = "table";
     public static final String ATTRIBUTE_TABLES         = "tables";
     public static final String ATTRIBUTE_INDEXES        = "indexes";
     public static final String ATTRIBUTE_FOREIGN_KEYS   = "foreign_keys";
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index d54c88d..ff9c9cb 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -18,14 +18,26 @@
 package org.apache.atlas.notification.preprocessor;
 
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import 
org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
 import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class HivePreprocessor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HivePreprocessor.class);
+
+    private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS        = 
"hive_table_columns";
+    private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = 
"hive_table_partitionKeys";
+
     static class HiveTablePreprocessor extends EntityPreprocessor {
         public HiveTablePreprocessor() {
             super(TYPE_HIVE_TABLE);
@@ -54,9 +66,64 @@ public class HivePreprocessor {
                     entity.setAttribute(ATTRIBUTE_SD, null);
                     entity.setAttribute(ATTRIBUTE_COLUMNS, null);
                     entity.setAttribute(ATTRIBUTE_PARTITION_KEYS, null);
+                } else if (context.getHiveTypesRemoveOwnedRefAttrs()) {
+                    context.removeRefAttributeAndRegisterToMove(entity, 
ATTRIBUTE_SD);
+
+                    removeColumnsAttributeAndRegisterToMove(entity, 
ATTRIBUTE_COLUMNS, RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS, context);
+                    removeColumnsAttributeAndRegisterToMove(entity, 
ATTRIBUTE_PARTITION_KEYS, RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS, context);
                 }
             }
         }
+
+        private void removeColumnsAttributeAndRegisterToMove(AtlasEntity 
entity, String attrName, String relationshipType, PreprocessorContext context) {
+            Object attrVal = entity.getAttribute(attrName);
+
+            if (attrVal != null) {
+                Set<String> guids = new HashSet<>();
+
+                context.collectGuids(attrVal, guids);
+
+                for (String guid : guids) {
+                    AtlasEntity colEntity = context.getEntity(guid);
+
+                    if (colEntity != null) {
+                        Object attrTable = null;
+
+                        if 
(colEntity.hasRelationshipAttribute(ATTRIBUTE_TABLE)) {
+                            attrTable = 
colEntity.getRelationshipAttribute(ATTRIBUTE_TABLE);
+                        } else if (colEntity.hasAttribute(ATTRIBUTE_TABLE)) {
+                            attrTable = 
colEntity.getAttribute(ATTRIBUTE_TABLE);
+                        }
+
+                        attrTable = setRelationshipType(attrTable, 
relationshipType);
+
+                        if (attrTable != null) {
+                            
colEntity.setRelationshipAttribute(ATTRIBUTE_TABLE, attrTable);
+                        }
+
+                        context.addToReferredEntitiesToMove(guid);
+                    }
+                }
+            }
+        }
+
+        private AtlasRelatedObjectId setRelationshipType(Object attr, String 
relationshipType) {
+            AtlasRelatedObjectId ret = null;
+
+            if (attr instanceof AtlasRelatedObjectId) {
+                ret = (AtlasRelatedObjectId) attr;
+            } else if (attr instanceof AtlasObjectId) {
+                ret = new AtlasRelatedObjectId((AtlasObjectId) attr);
+            } else if (attr instanceof Map) {
+                ret = new AtlasRelatedObjectId((Map) attr);
+            }
+
+            if (ret != null) {
+                ret.setRelationshipType(relationshipType);
+            }
+
+            return ret;
+        }
     }
 
 
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 0f95fba..94e0993 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -44,16 +44,18 @@ public class PreprocessorContext {
     private final List<Pattern>                       hiveTablesToIgnore;
     private final List<Pattern>                       hiveTablesToPrune;
     private final Map<String, PreprocessAction>       hiveTablesCache;
+    private final boolean                             
hiveTypesRemoveOwnedRefAttrs;
     private final boolean                             
rdbmsTypesRemoveOwnedRefAttrs;
     private final Set<String>                         ignoredEntities        = 
new HashSet<>();
     private final Set<String>                         prunedEntities         = 
new HashSet<>();
     private final Set<String>                         referredEntitiesToMove = 
new HashSet<>();
 
-    public PreprocessorContext(AtlasKafkaMessage<HookNotification> 
kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> 
hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean 
rdbmsTypesRemoveOwnedRefAttrs) {
+    public PreprocessorContext(AtlasKafkaMessage<HookNotification> 
kafkaMessage, List<Pattern> hiveTablesToIgnore, List<Pattern> 
hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, boolean 
hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs) {
         this.kafkaMessage                  = kafkaMessage;
         this.hiveTablesToIgnore            = hiveTablesToIgnore;
         this.hiveTablesToPrune             = hiveTablesToPrune;
         this.hiveTablesCache               = hiveTablesCache;
+        this.hiveTypesRemoveOwnedRefAttrs  = hiveTypesRemoveOwnedRefAttrs;
         this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs;
 
         final HookNotification  message = kafkaMessage.getMessage();
@@ -85,6 +87,8 @@ public class PreprocessorContext {
         return kafkaMessage.getPartition();
     }
 
+    public boolean getHiveTypesRemoveOwnedRefAttrs() { return 
hiveTypesRemoveOwnedRefAttrs; }
+
     public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return 
rdbmsTypesRemoveOwnedRefAttrs; }
 
     public List<AtlasEntity> getEntities() {

Reply via email to