[ 
https://issues.apache.org/jira/browse/HDDS-13956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Devesh Kumar Singh updated HDDS-13956:
--------------------------------------
    Description: 
*Location:* ReconTaskControllerImpl.java

 

 
{code:java}
public synchronized void stop() {
    LOG.info("Stopping Recon Task Controller.");
    if (this.executorService != null) {
        this.executorService.shutdownNow();  // No awaitTermination
    }
    if (this.eventProcessingExecutor != null) {
        this.eventProcessingExecutor.shutdownNow();  // No awaitTermination
    }
}
{code}
 

 

*Impact:* *Service reliability, data integrity* *Likelihood:* *High (every 
service shutdown)*

 

*Java 8 Compatible Fix:*

 

 
{code:java}
private static final int SHUTDOWN_TIMEOUT_SECONDS = 30;
 
public synchronized void stop() {
    LOG.info("Stopping Recon Task Controller.");
    shutdownExecutorGracefully(this.executorService, "main task executor");
    shutdownExecutorGracefully(this.eventProcessingExecutor, "event processing 
executor");
}
 
{code}
 

 
{code:java}
private void shutdownExecutorGracefully(ExecutorService executor, String name) {
    if (executor == null) return;
    
    executor.shutdown();
    try {
        if (!executor.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)) {
            LOG.warn("Executor {} did not terminate within {} seconds, forcing 
shutdown", 
                     name, SHUTDOWN_TIMEOUT_SECONDS);
            executor.shutdownNow();
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                LOG.error("Executor {} did not terminate after forced 
shutdown", name);
            }
        }
    } catch (InterruptedException e) {
        LOG.warn("Interrupted while waiting for {} to terminate", name);
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}
{code}
 

*Location:* OzoneManagerServiceProviderImpl.java:

 
{code:java}
scheduler.shutdownNow();  // No awaitTermination
 
{code}
 

*Risk:* Scheduler threads may not terminate, causing resource leaks and 
preventing JVM shutdown *Impact:* *Resource exhaustion, service restart 
failures*

 

*Fix:*

 
{code:java}
private void stopSyncDataFromOMThread() {
    scheduler.shutdown();
    try {
        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
            scheduler.shutdownNow();
            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                LOG.error("OM sync scheduler failed to terminate");
            }
        }
    } catch (InterruptedException e) {
        scheduler.shutdownNow();
        Thread.currentThread().interrupt();
    }
    tarExtractor.stop();
    LOG.debug("Shutdown the OM DB sync scheduler.");
}
{code}
 

 

 

 

 

  was:
*Location:* ReconTaskControllerImpl.java

 

 
{code:java}
public synchronized void stop() {
    LOG.info("Stopping Recon Task Controller.");
    if (this.executorService != null) {
        this.executorService.shutdownNow();  // No awaitTermination
    }
    if (this.eventProcessingExecutor != null) {
        this.eventProcessingExecutor.shutdownNow();  // No awaitTermination
    }
}
{code}
 

 

*Impact:* *Service reliability, data integrity* *Likelihood:* *High (every 
service shutdown)*

 

*Java 8 Compatible Fix:*

 

 
{code:java}
private static final int SHUTDOWN_TIMEOUT_SECONDS = 30;
 
public synchronized void stop() {
    LOG.info("Stopping Recon Task Controller.");
    shutdownExecutorGracefully(this.executorService, "main task executor");
    shutdownExecutorGracefully(this.eventProcessingExecutor, "event processing 
executor");
}
 
{code}
 

 
{code:java}
private void shutdownExecutorGracefully(ExecutorService executor, String name) {
    if (executor == null) return;
    
    executor.shutdown();
    try {
        if (!executor.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, 
TimeUnit.SECONDS)) {
            LOG.warn("Executor {} did not terminate within {} seconds, forcing 
shutdown", 
                     name, SHUTDOWN_TIMEOUT_SECONDS);
            executor.shutdownNow();
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                LOG.error("Executor {} did not terminate after forced 
shutdown", name);
            }
        }
    } catch (InterruptedException e) {
        LOG.warn("Interrupted while waiting for {} to terminate", name);
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }
}
{code}
 

*Location:* OzoneManagerServiceProviderImpl.java:

 
{code:java}
scheduler.shutdownNow();  // No awaitTermination
 
{code}
 

*Risk:* Scheduler threads may not terminate, causing resource leaks and 
preventing JVM shutdown *Impact:* *Resource exhaustion, service restart 
failures*

 

*Fix:*

 
{code:java}
private void stopSyncDataFromOMThread() {
    scheduler.shutdown();
    try {
        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
            scheduler.shutdownNow();
            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                LOG.error("OM sync scheduler failed to terminate");
            }
        }
    } catch (InterruptedException e) {
        scheduler.shutdownNow();
        Thread.currentThread().interrupt();
    }
    tarExtractor.stop();
    LOG.debug("Shutdown the OM DB sync scheduler.");
}
{code}
 

*Location:* ReconTaskControllerImpl.java:

 
{code:java}
private void processBufferedEventsAsync() {
    LOG.info("Started async buffered event processing thread");
    
    while (!Thread.currentThread().isInterrupted()) {
        try {
            OMUpdateEventBatch eventBatch = eventBuffer.poll(1000);
            // ... process events
        } catch (Exception e) {
            LOG.error("Error in async event processing thread", e);
            // ❌ No interrupt flag restoration for InterruptedException
        }
    }
}
 
{code}
 

*Risk:* Interrupted threads continue processing, preventing graceful shutdown 
*Impact:* *Service shutdown delays, resource accumulation*

 

*Fix:*

 
{code:java}
} catch (InterruptedException e) {
    LOG.info("Async event processing thread interrupted, shutting down 
gracefully");
    Thread.currentThread().interrupt();
    break;
} catch (Exception e) {
    LOG.error("Error in async event processing thread", e);
    // Continue processing other events for non-interrupt exceptions
}
{code}
 

*Location:* ReconDBProvider.java:

 
{code:java}
public void close() throws Exception {
    if (this.dbStore != null) {
        dbStore.close();  // No transaction safety check
        dbStore = null;
    }
}
{code}
 

 

*Risk:* Closing database while transactions are active causes data corruption 
*Impact:* *Data integrity, service recovery failures*

 

*Fix:*
{code:java}
public void close() throws Exception {
    if (this.dbStore != null) {
        // Wait for ongoing transactions to complete
        if (this.dbStore instanceof RDBStore) {
            RDBStore rdbStore = (RDBStore) this.dbStore;
            // Implement transaction drain logic
            rdbStore.flushDB(false); // Ensure writes are persisted
        }
        dbStore.close();
        dbStore = null;
    }
}
{code}
 

 

 

 

 


> Ozone Recon - Reliability and data integrity improvement in code
> ----------------------------------------------------------------
>
>                 Key: HDDS-13956
>                 URL: https://issues.apache.org/jira/browse/HDDS-13956
>             Project: Apache Ozone
>          Issue Type: Task
>          Components: Ozone Recon
>    Affects Versions: 2.0.0
>            Reporter: Devesh Kumar Singh
>            Assignee: Devesh Kumar Singh
>            Priority: Major
>
> *Location:* ReconTaskControllerImpl.java
>  
>  
> {code:java}
> public synchronized void stop() {
>     LOG.info("Stopping Recon Task Controller.");
>     if (this.executorService != null) {
>         this.executorService.shutdownNow();  // No awaitTermination
>     }
>     if (this.eventProcessingExecutor != null) {
>         this.eventProcessingExecutor.shutdownNow();  // No awaitTermination
>     }
> }
> {code}
>  
>  
> *Impact:* *Service reliability, data integrity* *Likelihood:* *High (every 
> service shutdown)*
>  
> *Java 8 Compatible Fix:*
>  
>  
> {code:java}
> private static final int SHUTDOWN_TIMEOUT_SECONDS = 30;
>  
> public synchronized void stop() {
>     LOG.info("Stopping Recon Task Controller.");
>     shutdownExecutorGracefully(this.executorService, "main task executor");
>     shutdownExecutorGracefully(this.eventProcessingExecutor, "event 
> processing executor");
> }
>  
> {code}
>  
>  
> {code:java}
> private void shutdownExecutorGracefully(ExecutorService executor, String 
> name) {
>     if (executor == null) return;
>     
>     executor.shutdown();
>     try {
>         if (!executor.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, 
> TimeUnit.SECONDS)) {
>             LOG.warn("Executor {} did not terminate within {} seconds, 
> forcing shutdown", 
>                      name, SHUTDOWN_TIMEOUT_SECONDS);
>             executor.shutdownNow();
>             if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
>                 LOG.error("Executor {} did not terminate after forced 
> shutdown", name);
>             }
>         }
>     } catch (InterruptedException e) {
>         LOG.warn("Interrupted while waiting for {} to terminate", name);
>         executor.shutdownNow();
>         Thread.currentThread().interrupt();
>     }
> }
> {code}
>  
> *Location:* OzoneManagerServiceProviderImpl.java:
>  
> {code:java}
> scheduler.shutdownNow();  // No awaitTermination
>  
> {code}
>  
> *Risk:* Scheduler threads may not terminate, causing resource leaks and 
> preventing JVM shutdown *Impact:* *Resource exhaustion, service restart 
> failures*
>  
> *Fix:*
>  
> {code:java}
> private void stopSyncDataFromOMThread() {
>     scheduler.shutdown();
>     try {
>         if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
>             scheduler.shutdownNow();
>             if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
>                 LOG.error("OM sync scheduler failed to terminate");
>             }
>         }
>     } catch (InterruptedException e) {
>         scheduler.shutdownNow();
>         Thread.currentThread().interrupt();
>     }
>     tarExtractor.stop();
>     LOG.debug("Shutdown the OM DB sync scheduler.");
> }
> {code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to