This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 46458a6  ATLAS-4181: Provide option to add mandatory attribute to 
existing entity definition
46458a6 is described below

commit 46458a6c088313beeb11f769541ef3f0cac9910e
Author: Radhika Kundam <[email protected]>
AuthorDate: Fri Mar 5 09:33:48 2021 -0800

    ATLAS-4181: Provide option to add mandatory attribute to existing entity 
definition
    
    Signed-off-by: Madhan Neethiraj <[email protected]>
    (cherry picked from commit d653cea3c26b025ad7d6c37a2df41fff745fd952)
---
 .../org/apache/atlas/repository/Constants.java     |   5 +
 .../graphdb/janus/AtlasSolrQueryBuilderTest.java   |   2 +-
 .../patches/AddMandatoryAttributesPatch.java       | 145 +++++++++++++++++++++
 .../bootstrap/AtlasTypeDefStoreInitializer.java    | 113 +++++++++++++++-
 .../store/graph/v2/AtlasStructDefStoreV2.java      |   9 +-
 .../main/java/org/apache/atlas/RequestContext.java |   9 ++
 6 files changed, 279 insertions(+), 4 deletions(-)

diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java 
b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 61abfca..771287f 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -203,6 +203,11 @@ public final class Constants {
     public static final Integer INCOMPLETE_ENTITY_VALUE   = Integer.valueOf(1);
 
     /*
+     * typedef patch constants
+     */
+    public static final String  TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE   = 
"ADD_MANDATORY_ATTRIBUTE";
+
+    /*
      * All supported file-format extensions for Bulk Imports through file 
upload
      */
     public enum SupportedFileExtensions { XLSX, XLS, CSV }
diff --git 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
index 06c7221..c2acc5b 100644
--- 
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
+++ 
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
@@ -200,7 +200,7 @@ public class AtlasSolrQueryBuilderTest {
 
         processSearchParameters(fileName, underTest);
 
-        Assert.assertEquals(underTest.build(), "+t10  AND  
-__state_index:DELETED AND  +__typeName__index:(hive_table )  AND  ( ( 
+created__index:[ * TO100}  ) )");
+        Assert.assertEquals(underTest.build(), "+t10  AND  
-__state_index:DELETED AND  +__typeName__index:(hive_table )  AND  ( ( 
+created__index:[ * TO 100}  ) )");
     }
 
     @Test
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
new file mode 100644
index 0000000..3102516
--- /dev/null
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+
+public class AddMandatoryAttributesPatch extends AtlasPatchHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AddMandatoryAttributesPatch.class);
+
+    private static final String PATCH_ID          = "JAVA_PATCH_0000_008";
+    private static final String PATCH_DESCRIPTION = "Add mandatory attributes 
for all existing entities for given typeName";
+
+    private final PatchContext            context;
+    private final String                  typeName;
+    private final List<AtlasAttributeDef> attributesToAdd;
+
+    public AddMandatoryAttributesPatch(PatchContext context, String 
typedefPatchId, String typeName, List<AtlasAttributeDef> attributesToAdd) {
+        super(context.getPatchRegistry(), PATCH_ID + "_" + typedefPatchId, 
PATCH_DESCRIPTION);
+
+        this.context         = context;
+        this.typeName        = typeName;
+        this.attributesToAdd = attributesToAdd;
+    }
+
+    @Override
+    public void apply() throws AtlasBaseException {
+        LOG.info("==> MandatoryAttributePatch.apply(): patchId={}", 
getPatchId());
+
+        ConcurrentPatchProcessor patchProcessor = new 
AddMandatoryAttributesPatchProcessor(context, typeName, attributesToAdd);
+
+        patchProcessor.apply();
+
+        setStatus(APPLIED);
+
+        LOG.info("<== MandatoryAttributePatch.apply(): patchId={}, status={}", 
getPatchId(), getStatus());
+    }
+
+    public static class AddMandatoryAttributesPatchProcessor extends 
ConcurrentPatchProcessor {
+        private final String                  typeName;
+        private final Set<String>             typeAndAllSubTypes;
+        private final List<AtlasAttributeDef> attributesToAdd;
+
+        public AddMandatoryAttributesPatchProcessor(PatchContext context, 
String typeName, List<AtlasAttributeDef> attributesToAdd) {
+            super(context);
+
+            AtlasEntityType entityType = 
getTypeRegistry().getEntityTypeByName(typeName);
+
+            this.typeName        = typeName;
+            this.attributesToAdd = attributesToAdd;
+
+            if (entityType != null) {
+                this.typeAndAllSubTypes = entityType.getTypeAndAllSubTypes();
+            } else {
+                LOG.warn("AddMandatoryAttributesPatchProcessor(): failed to 
find entity-type {}", typeName);
+
+                this.typeAndAllSubTypes = Collections.emptySet();
+            }
+        }
+
+        @Override
+        public void submitVerticesToUpdate(WorkItemManager manager) {
+            if (CollectionUtils.isNotEmpty(typeAndAllSubTypes)) {
+                LOG.info("Entity types to be updated with mandatory 
attributes: {}", typeAndAllSubTypes.size());
+
+                for (String typeName : typeAndAllSubTypes) {
+                    LOG.info("finding entities of type {}", typeName);
+
+                    AtlasGraph       graph     = getGraph();
+                    Iterable<Object> vertexIds = 
graph.query().has(ENTITY_TYPE_PROPERTY_KEY, typeName).vertexIds();
+                    int              count     = 0;
+
+                    for (Iterator<Object> iterator = vertexIds.iterator(); 
iterator.hasNext(); ) {
+                        Object vertexId = iterator.next();
+
+                        manager.checkProduce(vertexId);
+
+                        count++;
+                    }
+
+                    LOG.info("found {} entities of type {}", count, typeName);
+                }
+            }
+        }
+
+        @Override
+        protected void processVertexItem(Long vertexId, AtlasVertex vertex, 
String typeName, AtlasEntityType entityType) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("==> 
AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={}, 
vertexId={})", typeName, vertexId);
+            }
+
+            for (AtlasAttributeDef attributeDef : attributesToAdd) {
+                AtlasAttribute attribute = 
entityType.getAttribute(attributeDef.getName());
+
+                if (attribute != null) {
+                    Object existingValue = 
vertex.getProperty(attribute.getVertexPropertyName(), Object.class);
+
+                    if (existingValue == null) {
+                        vertex.setProperty(attribute.getVertexPropertyName(), 
attributeDef.getDefaultValue());
+                    }
+                }
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("<== 
AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={}, 
vertexId={})", typeName, vertexId);
+            }
+        }
+
+        @Override
+        protected void prepareForExecution() {
+            //do nothing
+        }
+    }
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 676a0aa..89e9422 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -28,6 +28,7 @@ import org.apache.atlas.authorize.AtlasAuthorizerFactory;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef;
@@ -41,8 +42,10 @@ import 
org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
 import org.apache.atlas.model.typedef.AtlasStructDef;
 import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.Constants;
 import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.patches.AddMandatoryAttributesPatch;
 import org.apache.atlas.repository.patches.SuperTypesUpdatePatch;
 import org.apache.atlas.repository.patches.AtlasPatchManager;
 import org.apache.atlas.repository.patches.AtlasPatchRegistry;
@@ -454,7 +457,8 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
                     new UpdateTypeDefOptionsPatchHandler(typeDefStore, 
typeRegistry),
                     new SetServiceTypePatchHandler(typeDefStore, typeRegistry),
                     new UpdateAttributeMetadataHandler(typeDefStore, 
typeRegistry),
-                    new AddSuperTypePatchHandler(typeDefStore, typeRegistry)
+                    new AddSuperTypePatchHandler(typeDefStore, typeRegistry),
+                    new AddMandatoryAttributePatchHandler(typeDefStore, 
typeRegistry)
             };
 
             Map<String, PatchHandler> patchHandlerRegistry = new HashMap<>();
@@ -787,6 +791,113 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
         }
     }
 
+    class AddMandatoryAttributePatchHandler extends PatchHandler {
+        public AddMandatoryAttributePatchHandler(AtlasTypeDefStore 
typeDefStore, AtlasTypeRegistry typeRegistry) {
+            super(typeDefStore, typeRegistry, new String[] { 
Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE });
+        }
+
+        @Override
+        public PatchStatus applyPatch(TypeDefPatch patch) throws 
AtlasBaseException {
+            String           typeName  = patch.getTypeName();
+            AtlasBaseTypeDef typeDef   = 
typeRegistry.getTypeDefByName(typeName);
+            PatchStatus      ret;
+
+            if (typeDef == null) {
+                throw new 
AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(), 
typeName);
+            }
+
+            if (isPatchApplicable(patch, typeDef)) {
+                List<AtlasAttributeDef> attributesToAdd = 
getAttributesToAdd(patch, (AtlasStructDef) typeDef);
+
+                if (CollectionUtils.isEmpty(attributesToAdd)) {
+                    LOG.info("patch skipped: typeName={}; mandatory attributes 
are not valid in patch {}",patch.getTypeName(), patch.getId());
+
+                    ret = SKIPPED;
+                } else {
+                    try {
+                        RequestContext.get().setInTypePatching(true);
+
+                        
RequestContext.get().setCurrentTypePatchAction(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE);
+
+                        if (typeDef.getClass().equals(AtlasEntityDef.class)) {
+                            AtlasEntityDef updatedDef = new 
AtlasEntityDef((AtlasEntityDef) typeDef);
+
+                            updateTypeDefWithPatch(patch, updatedDef, 
attributesToAdd);
+
+                            typeDefStore.updateEntityDefByName(typeName, 
updatedDef);
+                        } else if 
(typeDef.getClass().equals(AtlasClassificationDef.class)) {
+                            AtlasClassificationDef updatedDef = new 
AtlasClassificationDef((AtlasClassificationDef) typeDef);
+
+                            updateTypeDefWithPatch(patch, updatedDef, 
attributesToAdd);
+
+                            
typeDefStore.updateClassificationDefByName(typeName, updatedDef);
+                        } else if 
(typeDef.getClass().equals(AtlasStructDef.class)) {
+                            AtlasStructDef updatedDef = new 
AtlasStructDef((AtlasStructDef) typeDef);
+
+                            updateTypeDefWithPatch(patch, updatedDef, 
attributesToAdd);
+
+                            typeDefStore.updateStructDefByName(typeName, 
updatedDef);
+                        } else {
+                            throw new 
AtlasBaseException(AtlasErrorCode.PATCH_NOT_APPLICABLE_FOR_TYPE, 
patch.getAction(), typeDef.getClass().getSimpleName());
+                        }
+
+                        LOG.info("adding a Java patch to update entities of {} 
with new mandatory attributes", typeName);
+
+                        // Java patch handler to add mandatory attributes
+                        patchManager.addPatchHandler(new 
AddMandatoryAttributesPatch(patchManager.getContext(), patch.getId(), typeName, 
attributesToAdd));
+
+                        ret = APPLIED;
+                    } finally {
+                        RequestContext.get().setInTypePatching(false);
+
+                        RequestContext.clear();
+                    }
+                }
+            } else {
+                LOG.info("patch skipped: typeName={}; applyToVersion={}; 
updateToVersion={}",
+                        patch.getTypeName(), patch.getApplyToVersion(), 
patch.getUpdateToVersion());
+
+                ret = SKIPPED;
+
+            }
+
+            return ret;
+        }
+
+        // Validate mandatory attribute with non-empty default value if 
PRIMITIVE, not unique and doesn't exists
+        private List<AtlasAttributeDef> getAttributesToAdd(TypeDefPatch patch, 
AtlasStructDef updatedDef) throws AtlasBaseException {
+            List<AtlasAttributeDef> ret = new ArrayList<>();
+
+            for (AtlasAttributeDef attributeDef : patch.getAttributeDefs()) {
+                TypeCategory attributeType = 
typeRegistry.getType(attributeDef.getTypeName()).getTypeCategory();
+
+                if (updatedDef.hasAttribute(attributeDef.getName())) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, 
typeName={}, attribute={}): already exists in type {}. Ignoring attribute", 
patch.getId(), patch.getTypeName(), attributeDef.getName(), 
updatedDef.getName());
+                } else if (attributeDef.getIsOptional()) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, 
typeName={}, attribute={}): is not mandatory attribute. Ignoring attribute", 
patch.getId(), patch.getTypeName(), attributeDef.getName());
+                } else if 
(StringUtils.isEmpty(attributeDef.getDefaultValue())) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, 
typeName={}, attribute={}): default value is missing. Ignoring attribute", 
patch.getId(), patch.getTypeName(), attributeDef.getName());
+                } else if (!TypeCategory.PRIMITIVE.equals(attributeType)) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, 
typeName={}, attribute={}): type {} is not primitive. Ignoring attribute", 
patch.getId(), patch.getTypeName(), attributeDef.getName(), 
attributeDef.getTypeName());
+                } else if (attributeDef.getIsUnique()) {
+                    LOG.warn("AddMandatoryAttributePatchHandler(id={}, 
typeName={}, attribute={}): is not unique. Ignoring attribute", patch.getId(), 
patch.getTypeName(), attributeDef.getName());
+                } else {
+                    ret.add(attributeDef);
+                }
+            }
+
+            return ret;
+        }
+
+        private void updateTypeDefWithPatch(TypeDefPatch patch, AtlasStructDef 
updatedDef, List<AtlasAttributeDef> attributesToAdd) {
+            for (AtlasAttributeDef attributeDef : attributesToAdd) {
+                updatedDef.addAttribute(attributeDef);
+            }
+
+            updatedDef.setTypeVersion(patch.getUpdateToVersion());
+        }
+    }
+
     class UpdateAttributePatchHandler extends PatchHandler {
         public UpdateAttributePatchHandler(AtlasTypeDefStore typeDefStore, 
AtlasTypeRegistry typeRegistry) {
             super(typeDefStore, typeRegistry, new String[] { 
"UPDATE_ATTRIBUTE" });
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
index 0c13a78..27dae16 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
@@ -442,8 +442,8 @@ public class AtlasStructDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasStructDe
                         continue;
                     }
 
-                    // new attribute - only allow if optional
-                    if (!attributeDef.getIsOptional()) {
+                    // new attribute - allow optional by default or allow 
mandatory only with typedef patch ADD_MANDATORY_ATTRIBUTE
+                    if (!attributeDef.getIsOptional() && 
!isInAddMandatoryAttributePatch()) {
                         throw new 
AtlasBaseException(AtlasErrorCode.CANNOT_ADD_MANDATORY_ATTRIBUTE, 
structDef.getName(), attributeDef.getName());
                     }
                 }
@@ -470,6 +470,11 @@ public class AtlasStructDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasStructDe
         AtlasGraphUtilsV2.setEncodedProperty(vertex, 
encodedStructDefPropertyKey, attrNames);
     }
 
+    public static boolean isInAddMandatoryAttributePatch() {
+        return RequestContext.get().isInTypePatching() &&
+                
StringUtils.equals(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE, 
RequestContext.get().getCurrentTypePatchAction());
+    }
+
     public static void updateVertexAddReferences(AtlasStructDef structDef, 
AtlasVertex vertex,
                                                  AtlasTypeDefGraphStoreV2 
typeDefStore) throws AtlasBaseException {
         for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java 
b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 7de3536..37d23c2 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -75,6 +75,7 @@ public class RequestContext {
     private boolean     isInTypePatching           = false;
     private boolean     createShellEntityForNonExistingReference = false;
     private boolean     skipFailedEntities = false;
+    private String      currentTypePatchAction = "";
 
     private RequestContext() {
     }
@@ -237,6 +238,14 @@ public class RequestContext {
         this.skipFailedEntities = skipFailedEntities;
     }
 
+    public String getCurrentTypePatchAction() {
+        return currentTypePatchAction;
+    }
+
+    public void setCurrentTypePatchAction(String currentTypePatchAction) {
+        this.currentTypePatchAction = currentTypePatchAction;
+    }
+
     public void recordEntityUpdate(AtlasEntityHeader entity) {
         if (entity != null && entity.getGuid() != null && ! 
entitiesToSkipUpdate.contains(entity.getGuid())) {
             updatedEntities.put(entity.getGuid(), entity);

Reply via email to