This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 46458a6 ATLAS-4181: Provide option to add mandatory attribute to
existing entity definition
46458a6 is described below
commit 46458a6c088313beeb11f769541ef3f0cac9910e
Author: Radhika Kundam <[email protected]>
AuthorDate: Fri Mar 5 09:33:48 2021 -0800
ATLAS-4181: Provide option to add mandatory attribute to existing entity
definition
Signed-off-by: Madhan Neethiraj <[email protected]>
(cherry picked from commit d653cea3c26b025ad7d6c37a2df41fff745fd952)
---
.../org/apache/atlas/repository/Constants.java | 5 +
.../graphdb/janus/AtlasSolrQueryBuilderTest.java | 2 +-
.../patches/AddMandatoryAttributesPatch.java | 145 +++++++++++++++++++++
.../bootstrap/AtlasTypeDefStoreInitializer.java | 113 +++++++++++++++-
.../store/graph/v2/AtlasStructDefStoreV2.java | 9 +-
.../main/java/org/apache/atlas/RequestContext.java | 9 ++
6 files changed, 279 insertions(+), 4 deletions(-)
diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java
b/common/src/main/java/org/apache/atlas/repository/Constants.java
index 61abfca..771287f 100644
--- a/common/src/main/java/org/apache/atlas/repository/Constants.java
+++ b/common/src/main/java/org/apache/atlas/repository/Constants.java
@@ -203,6 +203,11 @@ public final class Constants {
public static final Integer INCOMPLETE_ENTITY_VALUE = Integer.valueOf(1);
/*
+ * typedef patch constants
+ */
+ public static final String TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE =
"ADD_MANDATORY_ATTRIBUTE";
+
+ /*
* All supported file-format extensions for Bulk Imports through file
upload
*/
public enum SupportedFileExtensions { XLSX, XLS, CSV }
diff --git
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
index 06c7221..c2acc5b 100644
---
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
+++
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasSolrQueryBuilderTest.java
@@ -200,7 +200,7 @@ public class AtlasSolrQueryBuilderTest {
processSearchParameters(fileName, underTest);
- Assert.assertEquals(underTest.build(), "+t10 AND
-__state_index:DELETED AND +__typeName__index:(hive_table ) AND ( (
+created__index:[ * TO100} ) )");
+ Assert.assertEquals(underTest.build(), "+t10 AND
-__state_index:DELETED AND +__typeName__index:(hive_table ) AND ( (
+created__index:[ * TO 100} ) )");
}
@Test
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
new file mode 100644
index 0000000..3102516
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/AddMandatoryAttributesPatch.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.commons.collections.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+
+public class AddMandatoryAttributesPatch extends AtlasPatchHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(AddMandatoryAttributesPatch.class);
+
+ private static final String PATCH_ID = "JAVA_PATCH_0000_008";
+ private static final String PATCH_DESCRIPTION = "Add mandatory attributes
for all existing entities for given typeName";
+
+ private final PatchContext context;
+ private final String typeName;
+ private final List<AtlasAttributeDef> attributesToAdd;
+
+ public AddMandatoryAttributesPatch(PatchContext context, String
typedefPatchId, String typeName, List<AtlasAttributeDef> attributesToAdd) {
+ super(context.getPatchRegistry(), PATCH_ID + "_" + typedefPatchId,
PATCH_DESCRIPTION);
+
+ this.context = context;
+ this.typeName = typeName;
+ this.attributesToAdd = attributesToAdd;
+ }
+
+ @Override
+ public void apply() throws AtlasBaseException {
+ LOG.info("==> MandatoryAttributePatch.apply(): patchId={}",
getPatchId());
+
+ ConcurrentPatchProcessor patchProcessor = new
AddMandatoryAttributesPatchProcessor(context, typeName, attributesToAdd);
+
+ patchProcessor.apply();
+
+ setStatus(APPLIED);
+
+ LOG.info("<== MandatoryAttributePatch.apply(): patchId={}, status={}",
getPatchId(), getStatus());
+ }
+
+ public static class AddMandatoryAttributesPatchProcessor extends
ConcurrentPatchProcessor {
+ private final String typeName;
+ private final Set<String> typeAndAllSubTypes;
+ private final List<AtlasAttributeDef> attributesToAdd;
+
+ public AddMandatoryAttributesPatchProcessor(PatchContext context,
String typeName, List<AtlasAttributeDef> attributesToAdd) {
+ super(context);
+
+ AtlasEntityType entityType =
getTypeRegistry().getEntityTypeByName(typeName);
+
+ this.typeName = typeName;
+ this.attributesToAdd = attributesToAdd;
+
+ if (entityType != null) {
+ this.typeAndAllSubTypes = entityType.getTypeAndAllSubTypes();
+ } else {
+ LOG.warn("AddMandatoryAttributesPatchProcessor(): failed to
find entity-type {}", typeName);
+
+ this.typeAndAllSubTypes = Collections.emptySet();
+ }
+ }
+
+ @Override
+ public void submitVerticesToUpdate(WorkItemManager manager) {
+ if (CollectionUtils.isNotEmpty(typeAndAllSubTypes)) {
+ LOG.info("Entity types to be updated with mandatory
attributes: {}", typeAndAllSubTypes.size());
+
+ for (String typeName : typeAndAllSubTypes) {
+ LOG.info("finding entities of type {}", typeName);
+
+ AtlasGraph graph = getGraph();
+ Iterable<Object> vertexIds =
graph.query().has(ENTITY_TYPE_PROPERTY_KEY, typeName).vertexIds();
+ int count = 0;
+
+ for (Iterator<Object> iterator = vertexIds.iterator();
iterator.hasNext(); ) {
+ Object vertexId = iterator.next();
+
+ manager.checkProduce(vertexId);
+
+ count++;
+ }
+
+ LOG.info("found {} entities of type {}", count, typeName);
+ }
+ }
+ }
+
+ @Override
+ protected void processVertexItem(Long vertexId, AtlasVertex vertex,
String typeName, AtlasEntityType entityType) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("==>
AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={},
vertexId={})", typeName, vertexId);
+ }
+
+ for (AtlasAttributeDef attributeDef : attributesToAdd) {
+ AtlasAttribute attribute =
entityType.getAttribute(attributeDef.getName());
+
+ if (attribute != null) {
+ Object existingValue =
vertex.getProperty(attribute.getVertexPropertyName(), Object.class);
+
+ if (existingValue == null) {
+ vertex.setProperty(attribute.getVertexPropertyName(),
attributeDef.getDefaultValue());
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("<==
AddMandatoryAttributesPatchProcessor.processVertexItem(typeName={},
vertexId={})", typeName, vertexId);
+ }
+ }
+
+ @Override
+ protected void prepareForExecution() {
+ //do nothing
+ }
+ }
+}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 676a0aa..89e9422 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -28,6 +28,7 @@ import org.apache.atlas.authorize.AtlasAuthorizerFactory;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
+import org.apache.atlas.model.TypeCategory;
import org.apache.atlas.model.patches.AtlasPatch.PatchStatus;
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
import org.apache.atlas.model.typedef.AtlasBusinessMetadataDef;
@@ -41,8 +42,10 @@ import
org.apache.atlas.model.typedef.AtlasRelationshipEndDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.patches.AddMandatoryAttributesPatch;
import org.apache.atlas.repository.patches.SuperTypesUpdatePatch;
import org.apache.atlas.repository.patches.AtlasPatchManager;
import org.apache.atlas.repository.patches.AtlasPatchRegistry;
@@ -454,7 +457,8 @@ public class AtlasTypeDefStoreInitializer implements
ActiveStateChangeHandler {
new UpdateTypeDefOptionsPatchHandler(typeDefStore,
typeRegistry),
new SetServiceTypePatchHandler(typeDefStore, typeRegistry),
new UpdateAttributeMetadataHandler(typeDefStore,
typeRegistry),
- new AddSuperTypePatchHandler(typeDefStore, typeRegistry)
+ new AddSuperTypePatchHandler(typeDefStore, typeRegistry),
+ new AddMandatoryAttributePatchHandler(typeDefStore,
typeRegistry)
};
Map<String, PatchHandler> patchHandlerRegistry = new HashMap<>();
@@ -787,6 +791,113 @@ public class AtlasTypeDefStoreInitializer implements
ActiveStateChangeHandler {
}
}
+ class AddMandatoryAttributePatchHandler extends PatchHandler {
+ public AddMandatoryAttributePatchHandler(AtlasTypeDefStore
typeDefStore, AtlasTypeRegistry typeRegistry) {
+ super(typeDefStore, typeRegistry, new String[] {
Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE });
+ }
+
+ @Override
+ public PatchStatus applyPatch(TypeDefPatch patch) throws
AtlasBaseException {
+ String typeName = patch.getTypeName();
+ AtlasBaseTypeDef typeDef =
typeRegistry.getTypeDefByName(typeName);
+ PatchStatus ret;
+
+ if (typeDef == null) {
+ throw new
AtlasBaseException(AtlasErrorCode.PATCH_FOR_UNKNOWN_TYPE, patch.getAction(),
typeName);
+ }
+
+ if (isPatchApplicable(patch, typeDef)) {
+ List<AtlasAttributeDef> attributesToAdd =
getAttributesToAdd(patch, (AtlasStructDef) typeDef);
+
+ if (CollectionUtils.isEmpty(attributesToAdd)) {
+ LOG.info("patch skipped: typeName={}; mandatory attributes
are not valid in patch {}",patch.getTypeName(), patch.getId());
+
+ ret = SKIPPED;
+ } else {
+ try {
+ RequestContext.get().setInTypePatching(true);
+
+
RequestContext.get().setCurrentTypePatchAction(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE);
+
+ if (typeDef.getClass().equals(AtlasEntityDef.class)) {
+ AtlasEntityDef updatedDef = new
AtlasEntityDef((AtlasEntityDef) typeDef);
+
+ updateTypeDefWithPatch(patch, updatedDef,
attributesToAdd);
+
+ typeDefStore.updateEntityDefByName(typeName,
updatedDef);
+ } else if
(typeDef.getClass().equals(AtlasClassificationDef.class)) {
+ AtlasClassificationDef updatedDef = new
AtlasClassificationDef((AtlasClassificationDef) typeDef);
+
+ updateTypeDefWithPatch(patch, updatedDef,
attributesToAdd);
+
+
typeDefStore.updateClassificationDefByName(typeName, updatedDef);
+ } else if
(typeDef.getClass().equals(AtlasStructDef.class)) {
+ AtlasStructDef updatedDef = new
AtlasStructDef((AtlasStructDef) typeDef);
+
+ updateTypeDefWithPatch(patch, updatedDef,
attributesToAdd);
+
+ typeDefStore.updateStructDefByName(typeName,
updatedDef);
+ } else {
+ throw new
AtlasBaseException(AtlasErrorCode.PATCH_NOT_APPLICABLE_FOR_TYPE,
patch.getAction(), typeDef.getClass().getSimpleName());
+ }
+
+ LOG.info("adding a Java patch to update entities of {}
with new mandatory attributes", typeName);
+
+ // Java patch handler to add mandatory attributes
+ patchManager.addPatchHandler(new
AddMandatoryAttributesPatch(patchManager.getContext(), patch.getId(), typeName,
attributesToAdd));
+
+ ret = APPLIED;
+ } finally {
+ RequestContext.get().setInTypePatching(false);
+
+ RequestContext.clear();
+ }
+ }
+ } else {
+ LOG.info("patch skipped: typeName={}; applyToVersion={};
updateToVersion={}",
+ patch.getTypeName(), patch.getApplyToVersion(),
patch.getUpdateToVersion());
+
+ ret = SKIPPED;
+
+ }
+
+ return ret;
+ }
+
+ // Validate mandatory attribute with non-empty default value if
PRIMITIVE, not unique and doesn't exists
+ private List<AtlasAttributeDef> getAttributesToAdd(TypeDefPatch patch,
AtlasStructDef updatedDef) throws AtlasBaseException {
+ List<AtlasAttributeDef> ret = new ArrayList<>();
+
+ for (AtlasAttributeDef attributeDef : patch.getAttributeDefs()) {
+ TypeCategory attributeType =
typeRegistry.getType(attributeDef.getTypeName()).getTypeCategory();
+
+ if (updatedDef.hasAttribute(attributeDef.getName())) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={},
typeName={}, attribute={}): already exists in type {}. Ignoring attribute",
patch.getId(), patch.getTypeName(), attributeDef.getName(),
updatedDef.getName());
+ } else if (attributeDef.getIsOptional()) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={},
typeName={}, attribute={}): is not mandatory attribute. Ignoring attribute",
patch.getId(), patch.getTypeName(), attributeDef.getName());
+ } else if
(StringUtils.isEmpty(attributeDef.getDefaultValue())) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={},
typeName={}, attribute={}): default value is missing. Ignoring attribute",
patch.getId(), patch.getTypeName(), attributeDef.getName());
+ } else if (!TypeCategory.PRIMITIVE.equals(attributeType)) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={},
typeName={}, attribute={}): type {} is not primitive. Ignoring attribute",
patch.getId(), patch.getTypeName(), attributeDef.getName(),
attributeDef.getTypeName());
+ } else if (attributeDef.getIsUnique()) {
+ LOG.warn("AddMandatoryAttributePatchHandler(id={},
typeName={}, attribute={}): is not unique. Ignoring attribute", patch.getId(),
patch.getTypeName(), attributeDef.getName());
+ } else {
+ ret.add(attributeDef);
+ }
+ }
+
+ return ret;
+ }
+
+ private void updateTypeDefWithPatch(TypeDefPatch patch, AtlasStructDef
updatedDef, List<AtlasAttributeDef> attributesToAdd) {
+ for (AtlasAttributeDef attributeDef : attributesToAdd) {
+ updatedDef.addAttribute(attributeDef);
+ }
+
+ updatedDef.setTypeVersion(patch.getUpdateToVersion());
+ }
+ }
+
class UpdateAttributePatchHandler extends PatchHandler {
public UpdateAttributePatchHandler(AtlasTypeDefStore typeDefStore,
AtlasTypeRegistry typeRegistry) {
super(typeDefStore, typeRegistry, new String[] {
"UPDATE_ATTRIBUTE" });
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
index 0c13a78..27dae16 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
@@ -442,8 +442,8 @@ public class AtlasStructDefStoreV2 extends
AtlasAbstractDefStoreV2<AtlasStructDe
continue;
}
- // new attribute - only allow if optional
- if (!attributeDef.getIsOptional()) {
+ // new attribute - allow optional by default or allow
mandatory only with typedef patch ADD_MANDATORY_ATTRIBUTE
+ if (!attributeDef.getIsOptional() &&
!isInAddMandatoryAttributePatch()) {
throw new
AtlasBaseException(AtlasErrorCode.CANNOT_ADD_MANDATORY_ATTRIBUTE,
structDef.getName(), attributeDef.getName());
}
}
@@ -470,6 +470,11 @@ public class AtlasStructDefStoreV2 extends
AtlasAbstractDefStoreV2<AtlasStructDe
AtlasGraphUtilsV2.setEncodedProperty(vertex,
encodedStructDefPropertyKey, attrNames);
}
+ public static boolean isInAddMandatoryAttributePatch() {
+ return RequestContext.get().isInTypePatching() &&
+
StringUtils.equals(Constants.TYPEDEF_PATCH_ADD_MANDATORY_ATTRIBUTE,
RequestContext.get().getCurrentTypePatchAction());
+ }
+
public static void updateVertexAddReferences(AtlasStructDef structDef,
AtlasVertex vertex,
AtlasTypeDefGraphStoreV2
typeDefStore) throws AtlasBaseException {
for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {
diff --git a/server-api/src/main/java/org/apache/atlas/RequestContext.java
b/server-api/src/main/java/org/apache/atlas/RequestContext.java
index 7de3536..37d23c2 100644
--- a/server-api/src/main/java/org/apache/atlas/RequestContext.java
+++ b/server-api/src/main/java/org/apache/atlas/RequestContext.java
@@ -75,6 +75,7 @@ public class RequestContext {
private boolean isInTypePatching = false;
private boolean createShellEntityForNonExistingReference = false;
private boolean skipFailedEntities = false;
+ private String currentTypePatchAction = "";
private RequestContext() {
}
@@ -237,6 +238,14 @@ public class RequestContext {
this.skipFailedEntities = skipFailedEntities;
}
+ public String getCurrentTypePatchAction() {
+ return currentTypePatchAction;
+ }
+
+ public void setCurrentTypePatchAction(String currentTypePatchAction) {
+ this.currentTypePatchAction = currentTypePatchAction;
+ }
+
public void recordEntityUpdate(AtlasEntityHeader entity) {
if (entity != null && entity.getGuid() != null && !
entitiesToSkipUpdate.contains(entity.getGuid())) {
updatedEntities.put(entity.getGuid(), entity);