This is an automated email from the ASF dual-hosted git repository. sidmishra pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 25f84d8 ATLAS-4463: Fixed Infinite loop at Index Health Monitor 25f84d8 is described below commit 25f84d83768011a9dcfeaa59bf67a8a6121433ce Author: Sidharth Mishra <sidharthkmis...@gmail.com> AuthorDate: Thu Oct 28 14:10:07 2021 -0700 ATLAS-4463: Fixed Infinite loop at Index Health Monitor Signed-off-by: Sidharth Mishra <sidmis...@apache.org> (cherry picked from commit 680f23fab55f35cf65a5189ae3882ebe360b1b01) --- .../repository/graph/IndexRecoveryService.java | 28 ++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java index 2f11610..b316354 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java @@ -37,6 +37,7 @@ import java.text.SimpleDateFormat; import java.time.Instant; import java.util.Iterator; import java.util.TimeZone; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY; import static org.apache.atlas.repository.Constants.PROPERTY_KEY_INDEX_RECOVERY_NAME; @@ -58,6 +59,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { private final RecoveryInfoManagement recoveryInfoManagement; private Configuration configuration; private boolean isIndexRecoveryEnabled; + private RecoveryThread recoveryThread; @Inject public IndexRecoveryService(Configuration config, AtlasGraph graph) { @@ -67,7 +69,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { long healthCheckFrequencyMillis = config.getLong(SOLR_STATUS_CHECK_RETRY_INTERVAL, SOLR_STATUS_RETRY_DEFAULT_MS); this.recoveryInfoManagement = new RecoveryInfoManagement(graph); - RecoveryThread recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis); + this.recoveryThread = new RecoveryThread(recoveryInfoManagement, graph, recoveryStartTimeFromConfig, healthCheckFrequencyMillis); this.indexHealthMonitor = new Thread(recoveryThread, INDEX_HEALTH_MONITOR_THREAD_NAME); } @@ -102,6 +104,8 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { @Override public void stop() throws AtlasException { try { + recoveryThread.shutdown(); + indexHealthMonitor.join(); } catch (InterruptedException e) { LOG.error("indexHealthMonitor: Interrupted", e); @@ -143,6 +147,8 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { private long indexStatusCheckRetryMillis; private Object txRecoveryObject; + private final AtomicBoolean shouldRun = new AtomicBoolean(false); + private RecoveryThread(RecoveryInfoManagement recoveryInfoManagement, AtlasGraph graph, long startTimeFromConfig, long healthCheckFrequencyMillis) { this.graph = graph; this.recoveryInfoManagement = recoveryInfoManagement; @@ -154,9 +160,11 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { } public void run() { + shouldRun.set(true); + LOG.info("Index Health Monitor: Starting..."); - while (true) { + while (shouldRun.get()) { try { boolean solrHealthy = isSolrHealthy(); @@ -173,6 +181,22 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { } } + public void shutdown() { + try { + LOG.info("Index Health Monitor: Shutdown: Starting..."); + + // handle the case where thread was not started at all + // and shutdown called + if (shouldRun.get() == false) { + return; + } + + shouldRun.set(false); + } finally { + LOG.info("Index Health Monitor: Shutdown: Done!"); + } + } + private boolean isSolrHealthy() throws AtlasException, InterruptedException { Thread.sleep(indexStatusCheckRetryMillis);