This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new cd15f76 ATLAS-3077: Handle java patches in patch framework cd15f76 is described below commit cd15f768d6c0378925457f4447386a341c84f171 Author: Sarath Subramanian <ssubraman...@hortonworks.com> AuthorDate: Thu Mar 21 23:39:23 2019 -0700 ATLAS-3077: Handle java patches in patch framework --- .../org/apache/atlas/model/patches/AtlasPatch.java | 6 +- .../repository/graph/GraphBackedSearchIndexer.java | 14 +- .../repository/patches/AtlasJavaPatchHandler.java | 138 +++++++++++++++++ .../atlas/repository/patches/PatchContext.java | 59 ++++++++ .../patches/UniqueAttributePatchHandler.java | 163 +++++++++++++++++++++ .../bootstrap/AtlasTypeDefStoreInitializer.java | 96 ++++++++---- .../store/graph/v2/AtlasGraphUtilsV2.java | 66 +++++---- .../apache/atlas/web/resources/AdminResource.java | 2 +- 8 files changed, 475 insertions(+), 69 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java b/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java index cdf2441..ae0255f 100644 --- a/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java +++ b/intg/src/main/java/org/apache/atlas/model/patches/AtlasPatch.java @@ -50,12 +50,12 @@ public class AtlasPatch implements Serializable { private long updatedTime; private PatchStatus status; - public enum PatchStatus { APPLIED, SKIPPED, FAILED, UNKNOWN } + public enum PatchStatus { UNKNOWN, APPLIED, SKIPPED, FAILED } public AtlasPatch() { } - public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status, String updatedBy, - String createdBy, long createdTime, long updatedTime) { + public AtlasPatch(String id, String patchName, String type, String action, PatchStatus status, + String updatedBy, String createdBy, long createdTime, long updatedTime) { this.id = id; this.description = patchName; this.type = type; diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 4805435..c57f8e3 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -108,7 +108,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang private boolean recomputeIndexedKeys = true; private Set<String> vertexIndexKeys = new HashSet<>(); - private enum UniqueKind { NONE, GLOBAL_UNIQUE, PER_TYPE_UNIQUE } + public enum UniqueKind { NONE, GLOBAL_UNIQUE, PER_TYPE_UNIQUE } @Inject public GraphBackedSearchIndexer(AtlasTypeRegistry typeRegistry) throws AtlasException { @@ -431,7 +431,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang return type instanceof AtlasRelationshipType; } - private Class getPrimitiveClass(String attribTypeName) { + public Class getPrimitiveClass(String attribTypeName) { String attributeTypeName = attribTypeName.toLowerCase(); switch (attributeTypeName) { @@ -461,7 +461,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang throw new IllegalArgumentException(String.format("Unknown primitive typename %s", attribTypeName)); } - private AtlasCardinality toAtlasCardinality(AtlasAttributeDef.Cardinality cardinality) { + public AtlasCardinality toAtlasCardinality(AtlasAttributeDef.Cardinality cardinality) { switch (cardinality) { case SINGLE: return SINGLE; @@ -500,8 +500,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang return propertyKey; } - private void createVertexIndex(AtlasGraphManagement management, String propertyName, UniqueKind uniqueKind, Class propertyClass, - AtlasCardinality cardinality, boolean createCompositeIndex, boolean createCompositeIndexWithTypeAndSuperTypes) { + public void createVertexIndex(AtlasGraphManagement management, String propertyName, UniqueKind uniqueKind, Class propertyClass, + AtlasCardinality cardinality, boolean createCompositeIndex, boolean createCompositeIndexWithTypeAndSuperTypes) { if (propertyName != null) { AtlasPropertyKey propertyKey = management.getPropertyKey(propertyName); @@ -704,7 +704,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang return !(INDEX_EXCLUSION_CLASSES.contains(propertyClass) || cardinality.isMany()); } - private void commit(AtlasGraphManagement management) throws IndexException { + public void commit(AtlasGraphManagement management) throws IndexException { try { management.commit(); @@ -715,7 +715,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang } } - private void rollback(AtlasGraphManagement management) throws IndexException { + public void rollback(AtlasGraphManagement management) throws IndexException { try { management.rollback(); diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java new file mode 100644 index 0000000..470ff10 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasJavaPatchHandler.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.patches; + +import org.apache.atlas.RequestContext; +import org.apache.atlas.model.patches.AtlasPatch.PatchStatus; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.MapUtils; + +import java.util.Map; + +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; +import static org.apache.atlas.repository.Constants.CREATED_BY_KEY; +import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; +import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; +import static org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2.getCurrentUser; + +public abstract class AtlasJavaPatchHandler { + public final AtlasGraph graph; + public final AtlasTypeRegistry typeRegistry; + public final Map<String, PatchStatus> patchesRegistry; + public final EntityGraphRetriever entityRetriever; + public final GraphBackedSearchIndexer indexer; + public final PatchContext context; + public final String patchId; + public final String patchDescription; + + private PatchStatus patchStatus; + + public static final String JAVA_PATCH_TYPE = "JAVA_PATCH"; + + public AtlasJavaPatchHandler(PatchContext context, String patchId, String patchDescription) { + this.context = context; + this.graph = context.getGraph(); + this.typeRegistry = context.getTypeRegistry(); + this.indexer = context.getIndexer(); + this.patchesRegistry = context.getPatchesRegistry(); + this.patchId = patchId; + this.patchDescription = patchDescription; + this.patchStatus = getPatchStatus(patchesRegistry); + this.entityRetriever = new EntityGraphRetriever(typeRegistry); + + init(); + } + + private void init() { + PatchStatus patchStatus = getPatchStatus(); + + if (patchStatus == UNKNOWN) { + AtlasVertex patchVertex = graph.addVertex(); + + setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId); + setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patchDescription); + setEncodedProperty(patchVertex, PATCH_TYPE_PROPERTY_KEY, JAVA_PATCH_TYPE); + setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, getPatchStatus().toString()); + setEncodedProperty(patchVertex, TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); + setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); + setEncodedProperty(patchVertex, CREATED_BY_KEY, getCurrentUser()); + setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser()); + + graph.commit(); + + addToPatchesRegistry(patchId, getPatchStatus()); + } + } + + private PatchStatus getPatchStatus(Map<String, PatchStatus> patchesRegistry) { + PatchStatus ret = UNKNOWN; + + if (MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId)) { + ret = patchesRegistry.get(patchId); + } + + return ret; + } + + public void updatePatchVertex(PatchStatus patchStatus) { + AtlasVertex patchVertex = findByPatchId(patchId); + + if (patchVertex != null) { + setEncodedProperty(patchVertex, PATCH_STATE_PROPERTY_KEY, patchStatus.toString()); + setEncodedProperty(patchVertex, MODIFICATION_TIMESTAMP_PROPERTY_KEY, RequestContext.get().getRequestTime()); + setEncodedProperty(patchVertex, MODIFIED_BY_KEY, getCurrentUser()); + + graph.commit(); + + addToPatchesRegistry(getPatchId(), getPatchStatus()); + } + } + + public PatchStatus getPatchStatus() { + return patchStatus; + } + + public void addToPatchesRegistry(String patchId, PatchStatus status) { + getPatchesRegistry().put(patchId, status); + } + + public void setPatchStatus(PatchStatus patchStatus) { + this.patchStatus = patchStatus; + } + + public String getPatchId() { + return patchId; + } + + public Map<String, PatchStatus> getPatchesRegistry() { + return patchesRegistry; + } + + public abstract void applyPatch(); +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java new file mode 100644 index 0000000..a60422b --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/PatchContext.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.patches; + +import org.apache.atlas.model.patches.AtlasPatch.PatchStatus; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.type.AtlasTypeRegistry; + +import java.util.Map; + +/** + * Patch context for typedef and java patches. + */ +public class PatchContext { + private final AtlasGraph graph; + private final AtlasTypeRegistry typeRegistry; + private final GraphBackedSearchIndexer indexer; + private final Map<String, PatchStatus> patchesRegistry; + + public PatchContext(AtlasGraph graph, AtlasTypeRegistry typeRegistry, GraphBackedSearchIndexer indexer, + Map<String, PatchStatus> patchesRegistry) { + this.graph = graph; + this.typeRegistry = typeRegistry; + this.indexer = indexer; + this.patchesRegistry = patchesRegistry; + } + + public AtlasGraph getGraph() { + return graph; + } + + public AtlasTypeRegistry getTypeRegistry() { + return typeRegistry; + } + + public GraphBackedSearchIndexer getIndexer() { + return indexer; + } + + public Map<String, PatchStatus> getPatchesRegistry() { + return patchesRegistry; + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java new file mode 100644 index 0000000..0c65ef1 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatchHandler.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.patches; + +import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; +import org.apache.atlas.repository.IndexException; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer.UniqueKind; +import org.apache.atlas.repository.graphdb.AtlasCardinality; +import org.apache.atlas.repository.graphdb.AtlasGraphManagement; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.commons.collections.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; + +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.APPLIED; +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.FAILED; +import static org.apache.atlas.repository.graph.GraphHelper.getGuid; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findActiveEntityVerticesByType; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; + +public class UniqueAttributePatchHandler extends AtlasJavaPatchHandler { + private static final String PATCH_ID = "JAVA_PATCH_0000_001"; + private static final String PATCH_DESCRIPTION = "Add new vertex property for each unique attribute of active entities"; + private static final Logger LOG = LoggerFactory.getLogger(UniqueAttributePatchHandler.class); + + public UniqueAttributePatchHandler(PatchContext context) { + super(context, PATCH_ID, PATCH_DESCRIPTION); + } + + @Override + public void applyPatch() { + Collection<AtlasEntityType> allEntityTypes = typeRegistry.getAllEntityTypes(); + boolean patchFailed = false; + + for (AtlasEntityType entityType : allEntityTypes) { + String typeName = entityType.getTypeName(); + Map<String, AtlasAttribute> uniqAttributes = entityType.getUniqAttributes(); + int entitiesProcessed = 0; + + LOG.info("Applying java patch: {} for type: {}", getPatchId(), typeName); + + if (MapUtils.isNotEmpty(uniqAttributes)) { + Collection<AtlasAttribute> attributes = uniqAttributes.values(); + + try { + // register unique attribute property keys in graph + registerUniqueAttrPropertyKeys(attributes); + + Iterator<AtlasVertex> iterator = findActiveEntityVerticesByType(typeName); + + while (iterator.hasNext()) { + AtlasVertex entityVertex = iterator.next(); + boolean patchApplied = false; + + for (AtlasAttribute attribute : attributes) { + String uniquePropertyKey = attribute.getVertexUniquePropertyName(); + Collection<? extends String> propertyKeys = entityVertex.getPropertyKeys(); + + if (!propertyKeys.contains(uniquePropertyKey)) { + String propertyKey = attribute.getVertexPropertyName(); + AtlasAttributeDef attributeDef = attribute.getAttributeDef(); + Object uniqAttrValue = entityRetriever.mapVertexToPrimitive(entityVertex, propertyKey, attributeDef); + + // add the unique attribute property to vertex + setEncodedProperty(entityVertex, uniquePropertyKey, uniqAttrValue); + + try { + graph.commit(); + + patchApplied = true; + + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Added unique attribute property: {} to entity: {} ({})", + PATCH_ID, uniquePropertyKey, getGuid(entityVertex), typeName); + } + } catch (Throwable t) { + LOG.warn("Java patch ({}): failed to update entity guid: {}; typeName: {}; attrName: {}; attrValue: {}", + getPatchId(), getGuid(entityVertex), typeName, attribute.getName(), uniqAttrValue); + + continue; + } + } + } + + if (patchApplied) { + entitiesProcessed++; + } + + if (entitiesProcessed % 1000 == 0) { + LOG.info("Java patch: {} : processed {} {} entities.", getPatchId(), entitiesProcessed, typeName); + } + } + } catch (IndexException e) { + LOG.error("Java patch: {} failed! error: {}", getPatchId(), e); + + patchFailed = true; + + break; + } + } + + LOG.info("Applied java patch ({}) for type: {}; Total processed: {}", getPatchId(), typeName, entitiesProcessed); + } + + if (patchFailed) { + setPatchStatus(FAILED); + } else { + setPatchStatus(APPLIED); + } + + LOG.info("Applied java patch: {}; status: {}", getPatchId(), getPatchStatus()); + + updatePatchVertex(getPatchStatus()); + } + + private void registerUniqueAttrPropertyKeys(Collection<AtlasAttribute> attributes) throws IndexException { + AtlasGraphManagement management = graph.getManagementSystem(); + boolean idxCreated = false; + + for (AtlasAttribute attribute : attributes) { + String uniquePropertyName = attribute.getVertexUniquePropertyName(); + boolean uniquePropertyNameExists = management.getPropertyKey(uniquePropertyName) != null; + + if (!uniquePropertyNameExists) { + AtlasAttributeDef attributeDef = attribute.getAttributeDef(); + boolean isIndexable = attributeDef.getIsIndexable(); + String attribTypeName = attributeDef.getTypeName(); + Class propertyClass = indexer.getPrimitiveClass(attribTypeName); + AtlasCardinality cardinality = indexer.toAtlasCardinality(attributeDef.getCardinality()); + + indexer.createVertexIndex(management, uniquePropertyName, UniqueKind.NONE, propertyClass, cardinality, isIndexable, true); + + idxCreated = true; + } + } + + //Commit indexes + if (idxCreated) { + indexer.commit(management); + } + } +} \ No newline at end of file diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index 337a6db..78f3faf 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -42,8 +42,12 @@ import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.patches.AtlasJavaPatchHandler; +import org.apache.atlas.repository.patches.PatchContext; +import org.apache.atlas.repository.patches.UniqueAttributePatchHandler; import org.apache.atlas.repository.store.graph.v2.AtlasTypeDefGraphStoreV2; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasEntityType; @@ -90,7 +94,7 @@ import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.findByPatchId; -import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.initPatchesRegistry; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getPatchesRegistry; import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; /** @@ -105,18 +109,20 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { public static final String RELATIONSHIP_SWAP_ENDS = "swapEnds"; public static final String TYPEDEF_PATCH_TYPE = "TYPEDEF_PATCH"; - private final AtlasTypeDefStore atlasTypeDefStore; - private final AtlasTypeRegistry atlasTypeRegistry; - private final AtlasGraph atlasGraph; - private final Configuration conf; + private final AtlasTypeDefStore typeDefStore; + private final AtlasTypeRegistry typeRegistry; + private final AtlasGraph graph; + private final Configuration conf; + private final GraphBackedSearchIndexer indexer; @Inject - public AtlasTypeDefStoreInitializer(AtlasTypeDefStore atlasTypeDefStore, AtlasTypeRegistry atlasTypeRegistry, - AtlasGraph atlasGraph, Configuration conf) { - this.atlasTypeDefStore = atlasTypeDefStore; - this.atlasTypeRegistry = atlasTypeRegistry; - this.atlasGraph = atlasGraph; - this.conf = conf; + public AtlasTypeDefStoreInitializer(AtlasTypeDefStore typeDefStore, AtlasTypeRegistry typeRegistry, + AtlasGraph graph, Configuration conf, GraphBackedSearchIndexer indexer) { + this.typeDefStore = typeDefStore; + this.typeRegistry = typeRegistry; + this.graph = graph; + this.conf = conf; + this.indexer = indexer; } @PostConstruct @@ -124,7 +130,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { LOG.info("==> AtlasTypeDefStoreInitializer.init()"); if (!HAConfiguration.isHAEnabled(conf)) { - atlasTypeDefStore.init(); + typeDefStore.init(); loadBootstrapTypeDefs(); try { @@ -149,8 +155,9 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private void loadBootstrapTypeDefs() { LOG.info("==> AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()"); - String atlasHomeDir = System.getProperty("atlas.home"); - String modelsDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models"; + String atlasHomeDir = System.getProperty("atlas.home"); + String modelsDirName = (StringUtils.isEmpty(atlasHomeDir) ? "." : atlasHomeDir) + File.separator + "models"; + PatchContext patchContext = initPatchContext(); if (modelsDirName == null || modelsDirName.length() == 0) { LOG.info("Types directory {} does not exist or not readable or has no typedef files", modelsDirName); @@ -158,7 +165,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { // look for folders we need to load models from File topModeltypesDir = new File(modelsDirName); File[] modelsDirContents = topModeltypesDir.exists() ? topModeltypesDir.listFiles() : null; - Map<String, PatchStatus> patchesRegistry = initPatchesRegistry(); + if (modelsDirContents != null && modelsDirContents.length > 0) { Arrays.sort(modelsDirContents); @@ -169,23 +176,49 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { continue; } else if (!folder.getName().equals(PATCHES_FOLDER_NAME)){ // load the models alphabetically in the subfolders apart from patches - loadModelsInFolder(folder, patchesRegistry); + loadModelsInFolder(folder, patchContext); } } } // load any files in the top models folder and any associated patches. - loadModelsInFolder(topModeltypesDir, patchesRegistry); + loadModelsInFolder(topModeltypesDir, patchContext); } + + // apply java patches + applyJavaPatches(patchContext); + LOG.info("<== AtlasTypeDefStoreInitializer.loadBootstrapTypeDefs()"); } + private void applyJavaPatches(PatchContext context) { + // register java patches + AtlasJavaPatchHandler[] patches = new AtlasJavaPatchHandler[] { new UniqueAttributePatchHandler(context) }; + + // apply java patches + for (AtlasJavaPatchHandler patch : patches) { + PatchStatus patchStatus = patch.getPatchStatus(); + + if (patchStatus == APPLIED || patchStatus == SKIPPED) { + LOG.info("Ignoring java patch: {}; status: {}", patch.getPatchId(), patchStatus); + } else { + LOG.info("Applying java patch: {}; status: {}", patch.getPatchId(), patchStatus); + + patch.applyPatch(); + } + } + } + + public PatchContext initPatchContext() { + return new PatchContext(graph, typeRegistry, indexer, getPatchesRegistry()); + } + /** * Load all the model files in the supplied folder followed by the contents of the patches folder. * @param typesDir - * @param patchesRegistry + * @param context */ - private void loadModelsInFolder(File typesDir, Map<String, PatchStatus> patchesRegistry) { + private void loadModelsInFolder(File typesDir, PatchContext context) { LOG.info("==> AtlasTypeDefStoreInitializer({})", typesDir); String typesDirName = typesDir.getName(); @@ -210,11 +243,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { continue; } - AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, atlasTypeRegistry); - AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, atlasTypeRegistry, true); + AtlasTypesDef typesToCreate = getTypesToCreate(typesDef, typeRegistry); + AtlasTypesDef typesToUpdate = getTypesToUpdate(typesDef, typeRegistry, true); if (!typesToCreate.isEmpty() || !typesToUpdate.isEmpty()) { - atlasTypeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate); + typeDefStore.createUpdateTypesDef(typesToCreate, typesToUpdate); LOG.info("Created/Updated types defined in file {}", typeDefFile.getAbsolutePath()); } else { @@ -227,7 +260,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { } } - applyTypePatches(typesDir.getPath(), patchesRegistry); + applyTypePatches(typesDir.getPath(), context); } LOG.info("<== AtlasTypeDefStoreInitializer({})", typesDir); } @@ -367,7 +400,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { LOG.info("==> AtlasTypeDefStoreInitializer.instanceIsActive()"); try { - atlasTypeDefStore.init(); + typeDefStore.init(); loadBootstrapTypeDefs(); @@ -425,10 +458,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { return ret; } - private void applyTypePatches(String typesDirName, Map<String, PatchStatus> patchesRegistry) { + private void applyTypePatches(String typesDirName, PatchContext context) { String typePatchesDirName = typesDirName + File.separator + PATCHES_FOLDER_NAME; File typePatchesDir = new File(typePatchesDirName); File[] typePatchFiles = typePatchesDir.exists() ? typePatchesDir.listFiles() : null; + Map<String, PatchStatus> patchesRegistry = context.getPatchesRegistry(); if (typePatchFiles == null || typePatchFiles.length == 0) { LOG.info("Type patches directory {} does not exist or not readable or has no patches", typePatchesDirName); @@ -439,11 +473,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { Arrays.sort(typePatchFiles); PatchHandler[] patchHandlers = new PatchHandler[] { - new AddAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry), - new UpdateAttributePatchHandler(atlasTypeDefStore, atlasTypeRegistry), - new RemoveLegacyRefAttributesPatchHandler(atlasTypeDefStore, atlasTypeRegistry), - new UpdateTypeDefOptionsPatchHandler(atlasTypeDefStore, atlasTypeRegistry), - new SetServiceTypePatchHandler(atlasTypeDefStore, atlasTypeRegistry) + new AddAttributePatchHandler(typeDefStore, typeRegistry), + new UpdateAttributePatchHandler(typeDefStore, typeRegistry), + new RemoveLegacyRefAttributesPatchHandler(typeDefStore, typeRegistry), + new UpdateTypeDefOptionsPatchHandler(typeDefStore, typeRegistry), + new SetServiceTypePatchHandler(typeDefStore, typeRegistry) }; Map<String, PatchHandler> patchHandlerRegistry = new HashMap<>(); @@ -534,7 +568,7 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { private void createOrUpdatePatchVertex(TypeDefPatch patch, PatchStatus patchStatus, Map<String, PatchStatus> patchesRegistry) { String patchId = patch.getId(); boolean isPatchRegistered = MapUtils.isNotEmpty(patchesRegistry) && patchesRegistry.containsKey(patchId); - AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : atlasGraph.addVertex(); + AtlasVertex patchVertex = isPatchRegistered ? findByPatchId(patchId) : graph.addVertex(); setEncodedProperty(patchVertex, PATCH_ID_PROPERTY_KEY, patchId); setEncodedProperty(patchVertex, PATCH_DESCRIPTION_PROPERTY_KEY, patch.getDescription()); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java index 2d5fd97..dda324b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasGraphUtilsV2.java @@ -23,11 +23,11 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.GraphTransactionInterceptor; import org.apache.atlas.RequestContext; import org.apache.atlas.SortOrder; -import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.discovery.SearchProcessor; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.patches.AtlasPatch; import org.apache.atlas.model.patches.AtlasPatch.AtlasPatches; import org.apache.atlas.model.patches.AtlasPatch.PatchStatus; @@ -65,6 +65,7 @@ import java.util.Set; import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; import static org.apache.atlas.repository.Constants.CREATED_BY_KEY; +import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_DEFAULT; import static org.apache.atlas.repository.Constants.INDEX_SEARCH_VERTEX_PREFIX_PROPERTY; import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; @@ -74,8 +75,11 @@ import static org.apache.atlas.repository.Constants.PATCH_ID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.PATCH_DESCRIPTION_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.PATCH_STATE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.PATCH_TYPE_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY; +import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.VERTEX_INDEX; +import static org.apache.atlas.repository.graph.AtlasGraphProvider.getGraphInstance; import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.SortOrder.*; /** @@ -133,7 +137,7 @@ public class AtlasGraphUtilsV2 { } public static String getTypeName(AtlasElement element) { - return element.getProperty(Constants.ENTITY_TYPE_PROPERTY_KEY, String.class); + return element.getProperty(ENTITY_TYPE_PROPERTY_KEY, String.class); } public static String getEdgeLabel(String fromNode, String toNode) { @@ -341,7 +345,7 @@ public class AtlasGraphUtilsV2 { public static AtlasVertex findByPatchId(String patchId) { AtlasVertex ret = null; String indexQuery = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : ("+ patchId +")"; - Iterator<Result<Object, Object>> results = AtlasGraphProvider.getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices(); + Iterator<Result<Object, Object>> results = getGraphInstance().indexQuery(VERTEX_INDEX, indexQuery).vertices(); while (results != null && results.hasNext()) { ret = results.next().getVertex(); @@ -358,7 +362,7 @@ public class AtlasGraphUtilsV2 { AtlasVertex ret = GraphTransactionInterceptor.getVertexFromCache(guid); if (ret == null) { - AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() + AtlasGraphQuery query = getGraphInstance().query() .has(Constants.GUID_PROPERTY_KEY, guid); Iterator<AtlasVertex> results = query.vertices().iterator(); @@ -386,9 +390,9 @@ public class AtlasGraphUtilsV2 { } public static boolean typeHasInstanceVertex(String typeName) throws AtlasBaseException { - AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance() + AtlasGraphQuery query = getGraphInstance() .query() - .has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName); + .has(TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName); Iterator<AtlasVertex> results = query.vertices().iterator(); @@ -404,8 +408,8 @@ public class AtlasGraphUtilsV2 { public static AtlasVertex findByTypeAndUniquePropertyName(String typeName, String propertyName, Object attrVal) { MetricRecorder metric = RequestContext.get().startMetricRecord("findByTypeAndUniquePropertyName"); - AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() - .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName) + AtlasGraphQuery query = getGraphInstance().query() + .has(ENTITY_TYPE_PROPERTY_KEY, typeName) .has(propertyName, attrVal); Iterator<AtlasVertex> results = query.vertices().iterator(); @@ -420,7 +424,7 @@ public class AtlasGraphUtilsV2 { public static AtlasVertex findBySuperTypeAndUniquePropertyName(String typeName, String propertyName, Object attrVal) { MetricRecorder metric = RequestContext.get().startMetricRecord("findBySuperTypeAndUniquePropertyName"); - AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() + AtlasGraphQuery query = getGraphInstance().query() .has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName) .has(propertyName, attrVal); @@ -436,10 +440,10 @@ public class AtlasGraphUtilsV2 { public static AtlasVertex findByTypeAndPropertyName(String typeName, String propertyName, Object attrVal) { MetricRecorder metric = RequestContext.get().startMetricRecord("findByTypeAndPropertyName"); - AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() - .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typeName) + AtlasGraphQuery query = getGraphInstance().query() + .has(ENTITY_TYPE_PROPERTY_KEY, typeName) .has(propertyName, attrVal) - .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()); + .has(STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()); Iterator<AtlasVertex> results = query.vertices().iterator(); @@ -453,10 +457,10 @@ public class AtlasGraphUtilsV2 { public static AtlasVertex findBySuperTypeAndPropertyName(String typeName, String propertyName, Object attrVal) { MetricRecorder metric = RequestContext.get().startMetricRecord("findBySuperTypeAndPropertyName"); - AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() + AtlasGraphQuery query = getGraphInstance().query() .has(Constants.SUPER_TYPES_PROPERTY_KEY, typeName) .has(propertyName, attrVal) - .has(Constants.STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()); + .has(STATE_PROPERTY_KEY, AtlasEntity.Status.ACTIVE.name()); Iterator<AtlasVertex> results = query.vertices().iterator(); @@ -467,9 +471,9 @@ public class AtlasGraphUtilsV2 { return vertex; } - public static Map<String, PatchStatus> initPatchesRegistry() { - Map<String, PatchStatus> ret = new HashMap<>(); - AtlasPatches patches = getPatches(); + public static Map<String, PatchStatus> getPatchesRegistry() { + Map<String, PatchStatus> ret = new HashMap<>(); + AtlasPatches patches = getAllPatches(); for (AtlasPatch patch : patches.getPatches()) { String patchId = patch.getId(); @@ -483,7 +487,7 @@ public class AtlasGraphUtilsV2 { return ret; } - public static AtlasPatches getPatches() { + public static AtlasPatches getAllPatches() { List<AtlasPatch> ret = new ArrayList<>(); String idxQueryString = getIndexSearchPrefix() + "\"" + PATCH_ID_PROPERTY_KEY + "\" : (*)"; AtlasIndexQuery idxQuery = AtlasGraphProvider.getGraphInstance().indexQuery(VERTEX_INDEX, idxQueryString); @@ -534,8 +538,8 @@ public class AtlasGraphUtilsV2 { } public static List<String> findEntityGUIDsByType(String typename, SortOrder sortOrder) { - AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance().query() - .has(Constants.ENTITY_TYPE_PROPERTY_KEY, typename); + AtlasGraphQuery query = getGraphInstance().query() + .has(ENTITY_TYPE_PROPERTY_KEY, typename); if (sortOrder != null) { AtlasGraphQuery.SortOrder qrySortOrder = sortOrder == SortOrder.ASCENDING ? ASC : DESC; query.orderBy(Constants.QUALIFIED_NAME, qrySortOrder); @@ -555,14 +559,22 @@ public class AtlasGraphUtilsV2 { return ret; } + public static Iterator<AtlasVertex> findActiveEntityVerticesByType(String typename) { + AtlasGraphQuery query = getGraphInstance().query() + .has(ENTITY_TYPE_PROPERTY_KEY, typename) + .has(STATE_PROPERTY_KEY, Status.ACTIVE.name()); + + return query.vertices().iterator(); + } + public static List<String> findEntityGUIDsByType(String typename) { return findEntityGUIDsByType(typename, null); } public static boolean relationshipTypeHasInstanceEdges(String typeName) throws AtlasBaseException { - AtlasGraphQuery query = AtlasGraphProvider.getGraphInstance() + AtlasGraphQuery query = getGraphInstance() .query() - .has(Constants.TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName); + .has(TYPE_NAME_PROPERTY_KEY, AtlasGraphQuery.ComparisionOperator.EQUAL, typeName); Iterator<AtlasEdge> results = query.edges().iterator(); @@ -626,7 +638,7 @@ public class AtlasGraphUtilsV2 { } public static String getStateAsString(AtlasElement element) { - return element.getProperty(Constants.STATE_PROPERTY_KEY, String.class); + return element.getProperty(STATE_PROPERTY_KEY, String.class); } private static boolean canUseIndexQuery(AtlasEntityType entityType, String attributeName) { @@ -638,7 +650,7 @@ public class AtlasGraphUtilsV2 { ret = typeAndSubTypesQryStr.length() <= SearchProcessor.MAX_QUERY_STR_LENGTH_TYPES; if (ret) { - Set<String> indexSet = AtlasGraphProvider.getGraphInstance().getVertexIndexKeys(); + Set<String> indexSet = getGraphInstance().getVertexIndexKeys(); try { ret = indexSet.contains(entityType.getQualifiedAttributeName(attributeName)); } @@ -693,13 +705,13 @@ public class AtlasGraphUtilsV2 { private static AtlasIndexQuery getIndexQuery(AtlasEntityType entityType, String propertyName, String value) { StringBuilder sb = new StringBuilder(); - sb.append(INDEX_SEARCH_PREFIX + "\"").append(Constants.TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr()) + sb.append(INDEX_SEARCH_PREFIX + "\"").append(TYPE_NAME_PROPERTY_KEY).append("\":").append(entityType.getTypeAndAllSubTypesQryStr()) .append(" AND ") .append(INDEX_SEARCH_PREFIX + "\"").append(propertyName).append("\":").append(AtlasAttribute.escapeIndexQueryValue(value)) .append(" AND ") - .append(INDEX_SEARCH_PREFIX + "\"").append(Constants.STATE_PROPERTY_KEY).append("\":ACTIVE"); + .append(INDEX_SEARCH_PREFIX + "\"").append(STATE_PROPERTY_KEY).append("\":ACTIVE"); - return AtlasGraphProvider.getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString()); + return getGraphInstance().indexQuery(Constants.VERTEX_INDEX, sb.toString()); } public static String getIndexSearchPrefix() { diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 01bdcf7..c5ceb9d 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -564,7 +564,7 @@ public class AdminResource { LOG.debug("==> AdminResource.getAtlasPatches()"); } - AtlasPatches ret = AtlasGraphUtilsV2.getPatches(); + AtlasPatches ret = AtlasGraphUtilsV2.getAllPatches(); if (LOG.isDebugEnabled()) { LOG.debug("<== AdminResource.getAtlasPatches()");