This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 261331b ATLAS-4408: Dynamic handling of failure in updating index 261331b is described below commit 261331bbd1fa8e50517421ab834f01f89f4997e1 Author: Radhika Kundam <rkun...@cloudera.com> AuthorDate: Sun Oct 10 21:50:29 2021 -0700 ATLAS-4408: Dynamic handling of failure in updating index --- .../org/apache/atlas/repository/Constants.java | 8 + .../repository/graphdb/AtlasGraphIndexClient.java | 6 + .../repository/graphdb/AtlasGraphManagement.java | 19 ++ .../graphdb/janus/AtlasJanusGraphDatabase.java | 74 +++++- .../graphdb/janus/AtlasJanusGraphIndexClient.java | 55 +++- .../graphdb/janus/AtlasJanusGraphManagement.java | 85 ++++++- .../org/apache/atlas/ApplicationProperties.java | 2 + .../repository/graph/GraphBackedSearchIndexer.java | 3 + .../repository/graph/IndexRecoveryService.java | 283 +++++++++++++++++++++ .../graph/RecoveryInfoManagementTest.java | 65 +++++ .../atlas/listener/ActiveStateChangeHandler.java | 3 +- 11 files changed, 575 insertions(+), 28 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index aea0c13..2669c8a 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -228,6 +228,14 @@ public final class Constants { public static final String TASK_START_TIME = encodePropertyKey(TASK_PREFIX + "startTime"); public static final String TASK_END_TIME = encodePropertyKey(TASK_PREFIX + "endTime"); + /** + * Index Recovery vertex property keys. + */ + public static final String INDEX_RECOVERY_PREFIX = INTERNAL_PROPERTY_KEY_PREFIX + "idxRecovery_"; + public static final String PROPERTY_KEY_INDEX_RECOVERY_NAME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "name"); + public static final String PROPERTY_KEY_INDEX_RECOVERY_START_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "startTime"); + public static final String PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME = encodePropertyKey(INDEX_RECOVERY_PREFIX + "prevTime"); + /* * All supported file-format extensions for Bulk Imports through file upload */ diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java index 9960d89..54cf5f6 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphIndexClient.java @@ -54,4 +54,10 @@ public interface AtlasGraphIndexClient { * @param suggestionProperties the list of suggestion properties. */ void applySuggestionFields(String collectionName, List<String> suggestionProperties); + + /** + * Returns status of index client + * @return returns true if index client is active + */ + boolean isHealthy(); } diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java index 7e3b2f4..50d17a2 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java @@ -180,4 +180,23 @@ public interface AtlasGraphManagement { * @throws Exception */ void reindex(String indexName, List<AtlasElement> elements) throws Exception; + + /** + * Starts recovering indices from the specified recovery time and returns TransactionRecovery + * @param startTime + * @return transactionRecoveryObject + */ + Object startIndexRecovery(long startTime); + + /** + * Stop index recovery. + * @param txRecoveryObject + */ + void stopIndexRecovery(Object txRecoveryObject); + + /** + * Print index recovery stats. + * @param txRecoveryObject + */ + void printIndexRecoveryStats(Object txRecoveryObject); } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java index 11267c4..0d47e38 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java @@ -28,7 +28,6 @@ import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer; import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer; import org.apache.atlas.typesystem.types.DataTypes.TypeCategory; -import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper; import org.janusgraph.core.JanusGraph; @@ -48,28 +47,36 @@ import java.lang.reflect.Method; import java.lang.reflect.Modifier; import java.math.BigDecimal; import java.math.BigInteger; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY; +import static org.apache.atlas.ApplicationProperties.INDEX_RECOVERY_CONF; + /** * Default implementation for Graph Provider that doles out JanusGraph. */ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> { private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class); - private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("AtlasJanusGraphDatabase"); private static final String OLDER_STORAGE_EXCEPTION = "Storage version is incompatible with current client"; /** * Constant for the configuration property that indicates the prefix. */ - public static final String GRAPH_PREFIX = "atlas.graph"; - public static final String INDEX_BACKEND_CONF = "index.search.backend"; - public static final String SOLR_ZOOKEEPER_URL = "atlas.graph.index.search.solr.zookeeper-url"; - public static final String SOLR_ZOOKEEPER_URLS = "atlas.graph.index.search.solr.zookeeper-urls"; - public static final String INDEX_BACKEND_LUCENE = "lucene"; - public static final String INDEX_BACKEND_ES = "elasticsearch"; + public static final String GRAPH_PREFIX = "atlas.graph"; + public static final String INDEX_BACKEND_CONF = "index.search.backend"; + public static final String SOLR_ZOOKEEPER_URL = "atlas.graph.index.search.solr.zookeeper-url"; + public static final String SOLR_ZOOKEEPER_URLS = "atlas.graph.index.search.solr.zookeeper-urls"; + public static final String INDEX_BACKEND_LUCENE = "lucene"; + public static final String INDEX_BACKEND_ES = "elasticsearch"; + public static final String GRAPH_TX_LOG_CONF = "tx.log-tx"; + public static final String GRAPH_TX_LOG_VERBOSE_CONF = "tx.recovery.verbose"; + public static final String SOLR_INDEX_TX_LOG_TTL_CONF = "write.ahead.log.ttl.in.hours"; + public static final String GRAPH_TX_LOG_TTL_CONF = "log.tx.ttl"; + public static final long DEFAULT_GRAPH_TX_LOG_TTL = 72; //Hrs private static volatile AtlasJanusGraph atlasGraphInstance = null; private static volatile JanusGraph graphInstance; @@ -166,8 +173,11 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, throw new RuntimeException(e); } - graphInstance = initJanusGraph(config); + configureTxLogBasedIndexRecovery(); + + graphInstance = initJanusGraph(config); atlasGraphInstance = new AtlasJanusGraph(); + validateIndexBackend(config); } @@ -192,6 +202,52 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, } } + public static void configureTxLogBasedIndexRecovery() { + try { + boolean recoveryEnabled = ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY); + long ttl = ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF, DEFAULT_GRAPH_TX_LOG_TTL); + Duration txLogTtlSecs = Duration.ofSeconds(Duration.ofHours(ttl).getSeconds()); + + Map<String, Object> properties = new HashMap<String, Object>() {{ + put(GRAPH_TX_LOG_CONF, recoveryEnabled); + put(GRAPH_TX_LOG_VERBOSE_CONF, recoveryEnabled); + put(GRAPH_TX_LOG_TTL_CONF, txLogTtlSecs); + }}; + + updateGlobalConfiguration(properties); + + LOG.info("Tx Log-based Index Recovery: {}!", recoveryEnabled ? "Enabled" : "Disabled"); + } catch (Exception e) { + LOG.error("Error: Failed!", e); + } + } + + private static void updateGlobalConfiguration(Map<String, Object> map) { + JanusGraph graph = null; + JanusGraphManagement managementSystem = null; + + try { + graph = initJanusGraph(getConfiguration()); + managementSystem = graph.openManagement(); + + for (Map.Entry<String, Object> entry : map.entrySet()) { + managementSystem.set(entry.getKey(), entry.getValue()); + } + + LOG.info("Global properties updated!: {}", map); + } catch (Exception ex) { + LOG.error("Error updating global configuration: {}", map, ex); + } finally { + if (managementSystem != null) { + managementSystem.commit(); + } + + if (graph != null) { + graph.close(); + } + } + } + public static JanusGraph getBulkLoadingGraphInstance() { try { Configuration cfg = getConfiguration(); diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java index ef42dbd..9e9fdd8 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java @@ -40,16 +40,22 @@ import org.apache.solr.client.solrj.request.V2Request; import org.apache.solr.client.solrj.response.FacetField; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.client.solrj.response.TermsResponse; -import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.params.CommonParams; -import org.apache.solr.common.util.ContentStream; import org.apache.solr.common.util.NamedList; import org.janusgraph.diskstorage.solr.Solr6Index; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; import static org.apache.atlas.repository.Constants.FREETEXT_REQUEST_HANDLER; import static org.apache.atlas.repository.Constants.VERTEX_INDEX; @@ -57,11 +63,15 @@ import static org.apache.atlas.repository.Constants.VERTEX_INDEX; public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { private static final Logger LOG = LoggerFactory.getLogger(AtlasJanusGraphIndexClient.class); - private static final FreqComparator FREQ_COMPARATOR = new FreqComparator(); - private static final int DEFAULT_SUGGESTION_COUNT = 5; - private static final int MIN_FACET_COUNT_REQUIRED = 1; - private static final String TERMS_PREFIX = "terms.prefix"; - private static final String TERMS_FIELD = "terms.fl"; + private static final FreqComparator FREQ_COMPARATOR = new FreqComparator(); + private static final int DEFAULT_SUGGESTION_COUNT = 5; + private static final int MIN_FACET_COUNT_REQUIRED = 1; + private static final String TERMS_PREFIX = "terms.prefix"; + private static final String TERMS_FIELD = "terms.fl"; + private static final int SOLR_HEALTHY_STATUS = 0; + private static final long SOLR_STATUS_LOG_FREQUENCY_MS = 60000;//Prints SOLR DOWN status for every 1 min + private static long prevSolrHealthCheckTime; + private final Configuration configuration; @@ -70,6 +80,29 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { this.configuration = configuration; } + public boolean isHealthy() { + boolean isHealthy = false; + long currentTime = System.currentTimeMillis(); + + try { + if (isSolrHealthy()) { + isHealthy = true; + } + } catch (Exception exception) { + if (LOG.isDebugEnabled()) { + LOG.error("Error: isHealthy", exception); + } + } + + if (!isHealthy && (prevSolrHealthCheckTime == 0 || currentTime - prevSolrHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) { + LOG.info("Solr Health: Unhealthy!"); + + prevSolrHealthCheckTime = currentTime; + } + + return isHealthy; + } + @Override public void applySearchWeight(String collectionName, Map<String, Integer> indexFieldName2SearchWeightMap) { SolrClient solrClient = null; @@ -340,6 +373,12 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { return Collections.EMPTY_LIST; } + private boolean isSolrHealthy() throws SolrServerException, IOException { + SolrClient client = Solr6Index.getSolrClient(); + + return client != null && client.ping(Constants.VERTEX_INDEX).getStatus() == SOLR_HEALTHY_STATUS; + } + private void graphManagementCommit(AtlasGraphManagement management) { try { management.commit(); diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java index 1cc7f8b..b3eb071 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java @@ -19,37 +19,46 @@ package org.apache.atlas.repository.graphdb.janus; import com.google.common.base.Preconditions; import org.apache.atlas.AtlasConfiguration; -import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.repository.graphdb.AtlasCardinality; import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasEdgeLabel; import org.apache.atlas.repository.graphdb.AtlasElement; +import org.apache.atlas.repository.graphdb.AtlasGraphIndex; +import org.apache.atlas.repository.graphdb.AtlasGraphManagement; +import org.apache.atlas.repository.graphdb.AtlasPropertyKey; +import org.apache.commons.lang.StringUtils; import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; +import org.apache.tinkerpop.gremlin.structure.Vertex; import org.janusgraph.core.Cardinality; import org.janusgraph.core.EdgeLabel; +import org.janusgraph.core.JanusGraph; import org.janusgraph.core.JanusGraphElement; +import org.janusgraph.core.JanusGraphFactory; import org.janusgraph.core.PropertyKey; -import org.janusgraph.core.schema.*; +import org.janusgraph.core.log.TransactionRecovery; +import org.janusgraph.core.schema.ConsistencyModifier; +import org.janusgraph.core.schema.JanusGraphIndex; +import org.janusgraph.core.schema.JanusGraphManagement; import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder; +import org.janusgraph.core.schema.Mapping; +import org.janusgraph.core.schema.Parameter; +import org.janusgraph.core.schema.PropertyKeyMaker; import org.janusgraph.diskstorage.BackendTransaction; import org.janusgraph.diskstorage.indexing.IndexEntry; import org.janusgraph.graphdb.database.IndexSerializer; import org.janusgraph.graphdb.database.StandardJanusGraph; import org.janusgraph.graphdb.database.management.ManagementSystem; import org.janusgraph.graphdb.internal.Token; -import org.apache.atlas.repository.graphdb.AtlasCardinality; -import org.apache.atlas.repository.graphdb.AtlasEdgeLabel; -import org.apache.atlas.repository.graphdb.AtlasGraphIndex; -import org.apache.atlas.repository.graphdb.AtlasGraphManagement; -import org.apache.atlas.repository.graphdb.AtlasPropertyKey; -import org.apache.commons.lang.StringUtils; -import org.apache.tinkerpop.gremlin.structure.Edge; -import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.janusgraph.graphdb.log.StandardTransactionLogProcessor; import org.janusgraph.graphdb.transaction.StandardJanusGraphTx; import org.janusgraph.graphdb.types.IndexType; import org.janusgraph.graphdb.types.MixedIndexType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Instant; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -347,6 +356,62 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement { } } + @Override + public Object startIndexRecovery(long recoveryStartTime) { + Instant recoveryStartInstant = Instant.ofEpochMilli(recoveryStartTime); + JanusGraph janusGraph = this.graph.getGraph(); + + return JanusGraphFactory.startTransactionRecovery(janusGraph, recoveryStartInstant); + } + + @Override + public void stopIndexRecovery(Object txRecoveryObject) { + if (txRecoveryObject == null) { + return; + } + + try { + if (txRecoveryObject instanceof TransactionRecovery) { + TransactionRecovery txRecovery = (TransactionRecovery) txRecoveryObject; + StandardJanusGraph janusGraph = (StandardJanusGraph) this.graph.getGraph(); + + LOG.info("stopIndexRecovery: Index Client is unhealthy. Index recovery: Paused!"); + + janusGraph.getBackend().getSystemTxLog().close(); + + txRecovery.shutdown(); + } else { + LOG.error("stopIndexRecovery({}): Invalid transaction recovery object!", txRecoveryObject); + } + } catch (Exception e) { + LOG.warn("stopIndexRecovery: Error while shutting down transaction recovery", e); + } + } + + @Override + public void printIndexRecoveryStats(Object txRecoveryObject) { + if (txRecoveryObject == null) { + return; + } + + try { + if (txRecoveryObject instanceof TransactionRecovery) { + StandardTransactionLogProcessor txRecovery = (StandardTransactionLogProcessor) txRecoveryObject; + long[] statistics = txRecovery.getStatistics(); + + if (statistics.length >= 2) { + LOG.info("Index Recovery: Stats: Success:{}: Failed: {}", statistics[0], statistics[1]); + } else { + LOG.info("Index Recovery: Stats: {}", statistics); + } + } else { + LOG.error("Transaction stats: Invalid transaction recovery object!: Unexpected type: {}: Details: {}", txRecoveryObject.getClass().toString(), txRecoveryObject); + } + } catch (Exception e) { + LOG.error("Error: Retrieving log transaction stats!", e); + } + } + private void reindexElement(ManagementSystem managementSystem, IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement> elements) throws Exception { Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new HashMap<>(); StandardJanusGraphTx tx = managementSystem.getWrappedTx(); diff --git a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java index 682206d..4a37c77 100644 --- a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java +++ b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java @@ -50,6 +50,7 @@ public final class ApplicationProperties extends PropertiesConfiguration { public static final String INDEX_BACKEND_CONF = "atlas.graph.index.search.backend"; public static final String INDEX_MAP_NAME_CONF = "atlas.graph.index.search.map-name"; public static final String SOLR_WAIT_SEARCHER_CONF = "atlas.graph.index.search.solr.wait-searcher"; + public static final String INDEX_RECOVERY_CONF = "atlas.index.recovery.enable"; public static final String ENABLE_FULLTEXT_SEARCH_CONF = "atlas.search.fulltext.enable"; public static final String ENABLE_FREETEXT_SEARCH_CONF = "atlas.search.freetext.enable"; public static final String ATLAS_RUN_MODE = "atlas.run.mode"; @@ -66,6 +67,7 @@ public final class ApplicationProperties extends PropertiesConfiguration { public static final String DEFAULT_GRAPHDB_BACKEND = GRAPHBD_BACKEND_JANUS; public static final boolean DEFAULT_SOLR_WAIT_SEARCHER = false; public static final boolean DEFAULT_INDEX_MAP_NAME = false; + public static final boolean DEFAULT_INDEX_RECOVERY = true; public static final AtlasRunMode DEFAULT_ATLAS_RUN_MODE = AtlasRunMode.PROD; public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size"; diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index ddfb008..d65bb1a 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -353,6 +353,9 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang createCommonVertexIndex(management, TASK_CREATED_TIME, UniqueKind.NONE, Long.class, SINGLE, true, false); createCommonVertexIndex(management, TASK_STATUS, UniqueKind.NONE, String.class, SINGLE, true, false); + // index recovery + createCommonVertexIndex(management, PROPERTY_KEY_INDEX_RECOVERY_NAME, UniqueKind.GLOBAL_UNIQUE, String.class, SINGLE, true, false); + // create vertex-centric index createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, String.class, SINGLE); createVertexCentricIndex(management, CLASSIFICATION_LABEL, AtlasEdgeDirection.BOTH, CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, Boolean.class, SINGLE); diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java new file mode 100644 index 0000000..2f11610 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.graph; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.listener.ActiveStateChangeHandler; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasGraphQuery; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.service.Service; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.text.SimpleDateFormat; +import java.time.Instant; +import java.util.Iterator; +import java.util.TimeZone; + +import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY; +import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME; +import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME; +import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_START_TIME; +import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty; + +@Component +@Order(8) +public class IndexRecoveryService implements Service, ActiveStateChangeHandler { + private static final Logger LOG = LoggerFactory.getLogger(IndexRecoveryService.class); + private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private static final String INDEX_HEALTH_MONITOR_THREAD_NAME = "index-health-monitor"; + private static final String SOLR_STATUS_CHECK_RETRY_INTERVAL = "atlas.graph.index.status.check.frequency"; + private static final String SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME = "atlas.graph.index.recovery.start.time"; + private static final long SOLR_STATUS_RETRY_DEFAULT_MS = 30000; // 30 secs default + + private final Thread indexHealthMonitor; + private final RecoveryInfoManagement recoveryInfoManagement; + private Configuration configuration; + private boolean isIndexRecoveryEnabled; + + @Inject + public IndexRecoveryService(Configuration config, AtlasGraph graph) { + this.configuration = config; + this.isIndexRecoveryEnabled = config.getBoolean(ApplicationProperties.INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY); + long recoveryStartTimeFromConfig = getRecoveryStartTimeFromConfig(config); + long healthCheckFrequencyMillis = config.getLong(SOLR_STATUS_CHECK_RETRY_INTERVAL, SOLR_STATUS_RETRY_DEFAULT_MS); + this.recoveryInfoManagement = new RecoveryInfoManagement(graph); + + RecoveryThread recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis); + this.indexHealthMonitor = new Thread(recoveryThread, INDEX_HEALTH_MONITOR_THREAD_NAME); + } + + private long getRecoveryStartTimeFromConfig(Configuration config) { + long ret = 0L; + + try { + String time = config.getString(SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME); + + SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT); + dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + ret = dateFormat.parse(time).toInstant().toEpochMilli(); + } catch (Exception e) { + LOG.debug("Error fetching: {}", SOLR_INDEX_RECOVERY_CONFIGURED_START_TIME); + } + + return ret; + } + + @Override + public void start() throws AtlasException { + if (configuration == null || !HAConfiguration.isHAEnabled(configuration)) { + LOG.info("==> IndexRecoveryService.start()"); + + startTxLogMonitoring(); + + LOG.info("<== IndexRecoveryService.start()"); + } + } + + @Override + public void stop() throws AtlasException { + try { + indexHealthMonitor.join(); + } catch (InterruptedException e) { + LOG.error("indexHealthMonitor: Interrupted", e); + } + } + + @Override + public void instanceIsActive() throws AtlasException { + LOG.info("==> IndexRecoveryService.instanceIsActive()"); + + startTxLogMonitoring(); + + LOG.info("<== IndexRecoveryService.instanceIsActive()"); + } + + @Override + public void instanceIsPassive() throws AtlasException { + LOG.info("IndexRecoveryService.instanceIsPassive(): no action needed."); + } + + @Override + public int getHandlerOrder() { + return ActiveStateChangeHandler.HandlerOrder.INDEX_RECOVERY.getOrder(); + } + + private void startTxLogMonitoring() { + if (!isIndexRecoveryEnabled) { + LOG.warn("IndexRecoveryService: Recovery should be enabled."); + + return; + } + + indexHealthMonitor.start(); + } + + private static class RecoveryThread implements Runnable { + private final AtlasGraph graph; + private final RecoveryInfoManagement recoveryInfoManagement; + private long indexStatusCheckRetryMillis; + private Object txRecoveryObject; + + private RecoveryThread(RecoveryInfoManagement recoveryInfoManagement, AtlasGraph graph, long startTimeFromConfig, long healthCheckFrequencyMillis) { + this.graph = graph; + this.recoveryInfoManagement = recoveryInfoManagement; + this.indexStatusCheckRetryMillis = healthCheckFrequencyMillis; + + if (startTimeFromConfig > 0) { + this.recoveryInfoManagement.updateStartTime(startTimeFromConfig); + } + } + + public void run() { + LOG.info("Index Health Monitor: Starting..."); + + while (true) { + try { + boolean solrHealthy = isSolrHealthy(); + + if (this.txRecoveryObject == null && solrHealthy) { + startMonitoring(); + } + + if (this.txRecoveryObject != null && !solrHealthy) { + stopMonitoring(); + } + } catch (Exception e) { + LOG.error("Error: Index recovery monitoring!", e); + } + } + } + + private boolean isSolrHealthy() throws AtlasException, InterruptedException { + Thread.sleep(indexStatusCheckRetryMillis); + + return this.graph.getGraphIndexClient().isHealthy(); + } + + private void startMonitoring() { + Long startTime = null; + + try { + startTime = recoveryInfoManagement.getStartTime(); + txRecoveryObject = this.graph.getManagementSystem().startIndexRecovery(startTime); + + printIndexRecoveryStats(); + } catch (Exception e) { + LOG.error("Index Recovery: Start: Error!", e); + } finally { + LOG.info("Index Recovery: Started! Recovery time: {}", Instant.ofEpochMilli(startTime)); + } + } + + private void stopMonitoring() { + Instant newStartTime = Instant.now().minusMillis(indexStatusCheckRetryMillis); + + try { + this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject); + + recoveryInfoManagement.updateStartTime(newStartTime.toEpochMilli()); + + printIndexRecoveryStats(); + } catch (Exception e) { + LOG.info("Index Recovery: Stopped! Error!", e); + } finally { + this.txRecoveryObject = null; + + LOG.info("Index Recovery: Stopped! Recovery time: {}", newStartTime); + } + } + + private void printIndexRecoveryStats() { + this.graph.getManagementSystem().printIndexRecoveryStats(txRecoveryObject); + } + } + + @VisibleForTesting + static class RecoveryInfoManagement { + private static final String INDEX_RECOVERY_TYPE_NAME = "__solrIndexRecoveryInfo"; + + private final AtlasGraph graph; + + public RecoveryInfoManagement(AtlasGraph graph) { + this.graph = graph; + } + + public void updateStartTime(Long startTime) { + try { + Long prevStartTime = null; + AtlasVertex vertex = findVertex(); + + if (vertex == null) { + vertex = graph.addVertex(); + } else { + prevStartTime = getStartTime(vertex); + } + + setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME); + setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_START_TIME, startTime); + setEncodedProperty(vertex, PROPERTY_KEY_INDEX_RECOVERY_PREV_TIME, prevStartTime); + + } catch (Exception ex) { + LOG.error("Error: Updating: {}!", ex); + } finally { + graph.commit(); + } + } + + public Long getStartTime() { + AtlasVertex vertex = findVertex(); + + return getStartTime(vertex); + } + + private Long getStartTime(AtlasVertex vertex) { + if (vertex == null) { + LOG.warn("Vertex passed is NULL: Returned is 0"); + + return 0L; + } + + Long startTime = 0L; + + try { + startTime = vertex.getProperty(PROPERTY_KEY_INDEX_RECOVERY_START_TIME, Long.class); + } catch (Exception e) { + LOG.error("Error retrieving startTime", e); + } + + return startTime; + } + + private AtlasVertex findVertex() { + AtlasGraphQuery query = graph.query().has(PROPERTY_KEY_INDEX_RECOVERY_NAME, INDEX_RECOVERY_TYPE_NAME); + Iterator<AtlasVertex> results = query.vertices().iterator(); + + return results.hasNext() ? results.next() : null; + } + } +} \ No newline at end of file diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/RecoveryInfoManagementTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/RecoveryInfoManagementTest.java new file mode 100644 index 0000000..d0382f2 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/graph/RecoveryInfoManagementTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.graph; + +import com.google.inject.Inject; +import org.apache.atlas.RequestContext; +import org.apache.atlas.TestModules; +import org.apache.atlas.TestUtilsV2; +import org.apache.atlas.repository.AtlasTestBase; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Guice(modules = TestModules.TestOnlyModule.class) +public class RecoveryInfoManagementTest extends AtlasTestBase { + + @Inject + private AtlasGraph atlasGraph; + + + @BeforeTest + public void setupTest() { + RequestContext.clear(); + RequestContext.get().setUser(TestUtilsV2.TEST_USER, null); + } + + @BeforeClass + public void initialize() throws Exception { + super.initialize(); + } + + @AfterClass + public void cleanup() throws Exception { + super.cleanup(); + } + + @Test + public void verifyCreateUpdate() { + IndexRecoveryService.RecoveryInfoManagement rm = new IndexRecoveryService.RecoveryInfoManagement(atlasGraph); + long now = System.currentTimeMillis(); + rm.updateStartTime(now); + + long storedTime = rm.getStartTime(); + Assert.assertEquals(now, storedTime); + } +} diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java index ba8f088..2916d6b 100644 --- a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java +++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java @@ -33,7 +33,8 @@ public interface ActiveStateChangeHandler { ATLAS_PATCH_SERVICE(3), DEFAULT_METADATA_SERVICE(4), NOTIFICATION_HOOK_CONSUMER(5), - TASK_MANAGEMENT(6); + TASK_MANAGEMENT(6), + INDEX_RECOVERY(7); private final int order;