This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new c7c6f36 ATLAS-4447 : Update composite indexes status c7c6f36 is described below commit c7c6f3699ab2ffe3fbebfbb2885e3dd1c997c345 Author: Pinal Shah <pinal.s...@freestoneinfotech.com> AuthorDate: Tue Oct 12 14:20:09 2021 +0530 ATLAS-4447 : Update composite indexes status Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com> --- .../repository/graphdb/AtlasGraphManagement.java | 5 ++ .../graphdb/janus/AtlasJanusGraphManagement.java | 59 +++++++++++++++++++++ .../java/org/apache/atlas/AtlasConfiguration.java | 3 +- .../repository/patches/AtlasPatchManager.java | 1 + .../patches/UpdateCompositeIndexStatusPatch.java | 61 ++++++++++++++++++++++ 5 files changed, 128 insertions(+), 1 deletion(-) diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java index 50d17a2..8b67da9 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java @@ -173,6 +173,11 @@ public interface AtlasGraphManagement { */ void updateUniqueIndexesForConsistencyLock(); + /** + * update valid SchemaStatus for all vertex and edge indexes. + */ + void updateSchemaStatus(); + /*** * Re-index elements. * @param indexName: Name of the index that needs to be operated on. diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java index b3eb071..e7de830 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java @@ -45,10 +45,12 @@ import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder; import org.janusgraph.core.schema.Mapping; import org.janusgraph.core.schema.Parameter; import org.janusgraph.core.schema.PropertyKeyMaker; +import org.janusgraph.core.schema.SchemaStatus; import org.janusgraph.diskstorage.BackendTransaction; import org.janusgraph.diskstorage.indexing.IndexEntry; import org.janusgraph.graphdb.database.IndexSerializer; import org.janusgraph.graphdb.database.StandardJanusGraph; +import org.janusgraph.graphdb.database.management.GraphIndexStatusReport; import org.janusgraph.graphdb.database.management.ManagementSystem; import org.janusgraph.graphdb.internal.Token; import org.janusgraph.graphdb.log.StandardTransactionLogProcessor; @@ -64,6 +66,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; + +import static org.janusgraph.core.schema.SchemaAction.ENABLE_INDEX; +import static org.janusgraph.core.schema.SchemaStatus.ENABLED; +import static org.janusgraph.core.schema.SchemaStatus.INSTALLED; +import static org.janusgraph.core.schema.SchemaStatus.REGISTERED; /** * Janus implementation of AtlasGraphManagement. @@ -307,6 +315,57 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement { } } + @Override + public void updateSchemaStatus() { + updateSchemaStatus(this.management, this.graph.getGraph(), Vertex.class); + updateSchemaStatus(this.management, this.graph.getGraph(), Edge.class); + } + + public static void updateSchemaStatus(JanusGraphManagement mgmt, JanusGraph graph, Class<? extends Element> elementType) { + LOG.info("updating SchemaStatus for {}: Starting...", elementType.getSimpleName()); + int count = 0; + + Iterable<JanusGraphIndex> iterable = mgmt.getGraphIndexes(elementType); + + for (JanusGraphIndex index : iterable) { + + if (index.isCompositeIndex()) { + PropertyKey[] propertyKeys = index.getFieldKeys(); + SchemaStatus status = index.getIndexStatus(propertyKeys[0]); + String indexName = index.name(); + + try { + if (status == REGISTERED) { + JanusGraphManagement management = graph.openManagement(); + JanusGraphIndex indexToUpdate = management.getGraphIndex(indexName); + management.updateIndex(indexToUpdate, ENABLE_INDEX).get(); + management.commit(); + + GraphIndexStatusReport report = ManagementSystem.awaitGraphIndexStatus(graph, indexName).status(ENABLED).call(); + + if (!report.getConvergedKeys().isEmpty() && report.getConvergedKeys().containsKey(indexName)) { + LOG.info("SchemaStatus updated for index: {}, from {} to {}.", index.name(), REGISTERED, ENABLED); + + count++; + } else if (!report.getNotConvergedKeys().isEmpty() && report.getNotConvergedKeys().containsKey(indexName)) { + LOG.error("SchemaStatus failed to update index: {}, from {} to {}.", index.name(), REGISTERED, ENABLED); + } + + } else if (status == INSTALLED) { + LOG.warn("SchemaStatus {} found for index: {}", INSTALLED, indexName); + + } + } catch (InterruptedException e) { + LOG.error("IllegalStateException for indexName : {}, Exception: ", indexName, e); + } catch (ExecutionException e) { + LOG.error("ExecutionException for indexName : {}, Exception: ", indexName, e); + } + } + } + + LOG.info("updating SchemaStatus for {}: {}: Done!", elementType.getSimpleName(), count); + } + private static void setConsistency(JanusGraphManagement mgmt, Class<? extends Element> elementType) { LOG.info("setConsistency: {}: Starting...", elementType.getSimpleName()); int count = 0; diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 20f8f73..b63fab7 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -81,7 +81,8 @@ public enum AtlasConfiguration { DSL_CACHED_TRANSLATOR("atlas.dsl.cached.translator", true), DEBUG_METRICS_ENABLED("atlas.debug.metrics.enabled", false), TASKS_USE_ENABLED("atlas.tasks.enabled", true), - SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1); + SESSION_TIMEOUT_SECS("atlas.session.timeout.secs", -1), + UPDATE_COMPOSITE_INDEX_STATUS("atlas.update.composite.index.status", true); private static final Configuration APPLICATION_PROPERTIES; diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java index d30971a..44cd8ef 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java @@ -95,6 +95,7 @@ public class AtlasPatchManager { handlers.add(new IndexConsistencyPatch(context)); handlers.add(new ReIndexPatch(context)); handlers.add(new ProcessNamePatch(context)); + handlers.add(new UpdateCompositeIndexStatusPatch(context)); LOG.info("<== AtlasPatchManager.init()"); } diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java new file mode 100644 index 0000000..12a2b71 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.patches; + +import org.apache.atlas.AtlasConfiguration; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN; + +public class UpdateCompositeIndexStatusPatch extends AtlasPatchHandler { + private static final Logger LOG = LoggerFactory.getLogger(UpdateCompositeIndexStatusPatch.class); + + private static final String PATCH_ID = "JAVA_PATCH_0000_010"; + private static final String PATCH_DESCRIPTION = "updates schema status of composite indexes which are in REGISTERED to ENABLED."; + + private final PatchContext context; + + public UpdateCompositeIndexStatusPatch(PatchContext context) { + super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION); + this.context = context; + } + + @Override + public void apply() throws AtlasBaseException { + if (!AtlasConfiguration.UPDATE_COMPOSITE_INDEX_STATUS.getBoolean()) { + LOG.info("UpdateCompositeIndexStatusPatch: Skipped, since not enabled!"); + return; + } + + AtlasGraph graph = context.getGraph(); + + try { + LOG.info("UpdateCompositeIndexStatusPatch: Starting..."); + graph.getManagementSystem().updateSchemaStatus(); + } finally { + LOG.info("UpdateCompositeIndexStatusPatch: Done!"); + } + + setStatus(UNKNOWN); + + LOG.info("UpdateCompositeIndexStatusPatch.apply(): patchId={}, status={}", getPatchId(), getStatus()); + } +}