This is an automated email from the ASF dual-hosted git repository. nixon pushed a commit to branch branch-1.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 3a8cda4d2da7b1bd08c8233cc9d7c5076314dfc9 Author: Ashutosh Mestry <[email protected]> AuthorDate: Thu Feb 7 22:17:18 2019 -0800 ATLAS-3046: Classification Updater tool. Unique name used. --- .../org/apache/atlas/tools/BulkFetchAndUpdate.java | 99 ++++++++++++++++------ 1 file changed, 71 insertions(+), 28 deletions(-) diff --git a/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java b/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java index 1e0b66d..19f8325 100644 --- a/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java +++ b/tools/classification-updater/src/main/java/org/apache/atlas/tools/BulkFetchAndUpdate.java @@ -28,6 +28,8 @@ import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntityHeader; import org.apache.atlas.model.instance.AtlasEntityHeaders; import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.model.typedef.AtlasEntityDef; +import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.type.AtlasType; import org.apache.atlas.utils.AtlasJson; @@ -300,6 +302,7 @@ public class BulkFetchAndUpdate { private static final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName"; private AtlasClientV2 atlasClientV2; + private Map<String, String> typeNameUniqueAttributeNameMap = new HashMap<>(); public Preparer(AtlasClientV2 atlasClientV2) { this.atlasClientV2 = atlasClientV2; @@ -322,7 +325,7 @@ public class BulkFetchAndUpdate { try { classificationDef.setGuid(null); String json = AtlasType.toJson(classificationDef); - fileWriter.write(json + "\n"); + fileWriter.write(json); } catch (Exception e) { LOG.error("Error writing classifications: {}", e); displayCrLf("Error writing classifications."); @@ -348,11 +351,11 @@ public class BulkFetchAndUpdate { } try { - AtlasEntityHeaders response = atlasClientV2.getEntityHeaders(fromTimestamp); - int guidHeaderMapSize = response.getGuidHeaderMap().size(); + AtlasEntityHeaders entityHeaders = atlasClientV2.getEntityHeaders(fromTimestamp); + int guidHeaderMapSize = entityHeaders.getGuidHeaderMap().size(); try { displayCrLf("Read entities: " + guidHeaderMapSize); - AtlasEntityHeaders updatedHeaders = removeEntityGuids(response); + AtlasEntityHeaders updatedHeaders = removeEntityGuids(entityHeaders); fileWriter.write(AtlasType.toJson(updatedHeaders)); displayCrLf("Writing entities: " + updatedHeaders.getGuidHeaderMap().size()); @@ -368,12 +371,17 @@ public class BulkFetchAndUpdate { } private AtlasEntityHeaders removeEntityGuids(AtlasEntityHeaders headers) { - Map<String, AtlasEntityHeader> qualifiedNameHeaderMap = new HashMap<>(); + Map<String, AtlasEntityHeader> uniqueNameEntityHeaderMap = new HashMap<>(); for (AtlasEntityHeader header : headers.getGuidHeaderMap().values()) { - String qualifiedName = getQualifiedName(header); - displayCrLf("Processing: " + qualifiedName); + String uniqueName = getUniqueName(header); + if (StringUtils.isEmpty(uniqueName)) { + displayCrLf("UniqueName is empty. Ignoring: " + header.getGuid()); + LOG.warn("UniqueName is empty. Ignoring: {}", AtlasJson.toJson(header)); + continue; + } + displayCrLf("Processing: " + uniqueName); if (header.getStatus() == DELETED) { continue; } @@ -383,37 +391,72 @@ public class BulkFetchAndUpdate { continue; } - boolean keyFound = qualifiedNameHeaderMap.containsKey(qualifiedName); + String key = String.format("%s:%s", header.getTypeName(), uniqueName); + boolean keyFound = uniqueNameEntityHeaderMap.containsKey(key); if (!keyFound) { - qualifiedNameHeaderMap.put(qualifiedName, header); + uniqueNameEntityHeaderMap.put(key, header); } - AtlasEntityHeader currentHeader = qualifiedNameHeaderMap.get(qualifiedName); - for (AtlasClassification c : header.getClassifications()) { - c.setEntityGuid(null); - - if (keyFound) { - boolean found = - currentHeader.getClassifications().stream().anyMatch(ox -> ox.getTypeName().equals(c.getTypeName())); - if (!found) { - currentHeader.getClassifications().add(c); - } else { - displayCrLf("Ignoring: " + c.toString()); - LOG.warn("Ignoring: {}", AtlasJson.toJson(c)); - } + updateClassificationsForHeader(header, uniqueNameEntityHeaderMap.get(key), keyFound); + displayCrLf("Processing: " + uniqueName); + } + + displayCrLf("Processed: " + uniqueNameEntityHeaderMap.size()); + headers.setGuidHeaderMap(uniqueNameEntityHeaderMap); + return headers; + } + + private void updateClassificationsForHeader(AtlasEntityHeader header, AtlasEntityHeader currentHeader, boolean keyFound) { + for (AtlasClassification c : header.getClassifications()) { + c.setEntityGuid(null); + + if (keyFound) { + boolean found = + currentHeader.getClassifications().stream().anyMatch(ox -> ox.getTypeName().equals(c.getTypeName())); + if (!found) { + currentHeader.getClassifications().add(c); + } else { + displayCrLf("Ignoring: " + c.toString()); + LOG.warn("Ignoring: {}", AtlasJson.toJson(c)); } } + } + } - displayCrLf("Processing: " + qualifiedName); + private String getUniqueName(AtlasEntityHeader header) { + String uniqueAttributeName = ATTR_NAME_QUALIFIED_NAME; + if (!header.getAttributes().containsKey(ATTR_NAME_QUALIFIED_NAME)) { + uniqueAttributeName = getUniqueAttribute(header.getTypeName()); } - displayCrLf("Processed: " + qualifiedNameHeaderMap.size()); - headers.setGuidHeaderMap(qualifiedNameHeaderMap); - return headers; + Object attrValue = header.getAttribute(uniqueAttributeName); + if (attrValue == null) { + LOG.warn("Unique Attribute Value: empty: {}", AtlasJson.toJson(header)); + return StringUtils.EMPTY; + } + + return attrValue.toString(); } - private String getQualifiedName(AtlasEntityHeader header) { - return (String) header.getAttribute(ATTR_NAME_QUALIFIED_NAME); + private String getUniqueAttribute(String typeName) { + try { + if (typeNameUniqueAttributeNameMap.containsKey(typeName)) { + return typeNameUniqueAttributeNameMap.get(typeName); + } + + AtlasEntityDef entityDef = atlasClientV2.getEntityDefByName(typeName); + for (AtlasStructDef.AtlasAttributeDef ad : entityDef.getAttributeDefs()) { + if (ad.getIsUnique()) { + typeNameUniqueAttributeNameMap.put(typeName, ad.getName()); + return ad.getName(); + } + } + } catch (AtlasServiceException e) { + LOG.error("Error fetching type: {}", typeName, e); + return null; + } + + return null; } private List<AtlasClassificationDef> getAllClassificationsDefs() throws Exception {
