devmadhuu commented on code in PR #7517:
URL: https://github.com/apache/ozone/pull/7517#discussion_r1908931924


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconScmTask.java:
##########
@@ -30,37 +30,33 @@ public abstract class ReconScmTask {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ReconScmTask.class);
   private Thread taskThread;
-  private ReconTaskStatusDao reconTaskStatusDao;
   private volatile boolean running;
+  private final ReconTaskStatusUpdater taskStatusUpdater;
 
-  protected ReconScmTask(ReconTaskStatusDao reconTaskStatusDao) {
-    this.reconTaskStatusDao = reconTaskStatusDao;
-  }
-
-  private void register() {
-    String taskName = getTaskName();
-    if (!reconTaskStatusDao.existsById(taskName)) {
-      ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
-          taskName, 0L, 0L);
-      reconTaskStatusDao.insert(reconTaskStatusRecord);
-      LOG.info("Registered {} task ", taskName);
-    }
+  protected ReconScmTask(
+      ReconTaskStatusUpdaterManager taskStatusUpdaterManager
+  ) {
+    // In case the task is not already present in the DB, table is updated 
with initial values for task
+    this.taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(getTaskName());
   }
 
   /**
    * Start underlying start thread.
    */
   public synchronized void start() {
-    register();
-    if (!isRunning()) {
-      LOG.info("Starting {} Thread.", getTaskName());
-      running = true;
-      taskThread = new Thread(this::run, "Recon-" + getTaskName());
-      taskThread.setName(getTaskName());
-      taskThread.setDaemon(true);
-      taskThread.start();
-    } else {
-      LOG.info("{} Thread is already running.", getTaskName());
+    try {
+      if (!isRunning()) {
+        LOG.info("Starting {} Thread.", getTaskName());
+        running = true;
+        taskThread = new Thread(this::run, "Recon-" + getTaskName());
+        taskThread.setName(getTaskName());
+        taskThread.setDaemon(true);
+        taskThread.start();
+      } else {
+        LOG.info("{} Thread is already running.", getTaskName());
+      }
+    } catch (Exception e) {

Review Comment:
   I don't see any usecase to add a catch block here. if a task thread throws 
any exception , it is already being caught in `Throwable` in run of each task. 
So this catch seems reduntant.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -571,45 +551,53 @@ public boolean syncDataFromOM() {
         if (currentSequenceNumber <= 0) {
           fullSnapshot = true;
         } else {
+          reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
+              OmSnapshotTaskName.OmDeltaRequest.name());
+
           try (OMDBUpdatesHandler omdbUpdatesHandler =
               new OMDBUpdatesHandler(omMetadataManager)) {
             LOG.info("Obtaining delta updates from Ozone Manager");
-            // Get updates from OM and apply to local Recon OM DB.
+            // Get updates from OM and apply to local Recon OM DB and update 
task status in table
+            reconTaskUpdater.recordRunStart();
             getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
                 omdbUpdatesHandler);
-            // Update timestamp of successful delta updates query.
-            ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
-                OmSnapshotTaskName.OmDeltaRequest.name(),
-                System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
-            reconTaskStatusDao.update(reconTaskStatusRecord);
 
+            reconTaskUpdater.setLastTaskRunStatus(0);
+            
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
             // Pass on DB update events to tasks that are listening.
             reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
                 omdbUpdatesHandler.getEvents()), omMetadataManager);
+          } catch (IOException | RocksDBException e) {

Review Comment:
   Not sure why this catch clauses are needed. These also seems redundant as 
not handling anything specific to `IOException` or `RocksDBException`. We 
already have parent `Exception` catch clause.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -571,45 +551,53 @@ public boolean syncDataFromOM() {
         if (currentSequenceNumber <= 0) {
           fullSnapshot = true;
         } else {
+          reconTaskUpdater = taskStatusUpdaterManager.getTaskStatusUpdater(
+              OmSnapshotTaskName.OmDeltaRequest.name());
+
           try (OMDBUpdatesHandler omdbUpdatesHandler =
               new OMDBUpdatesHandler(omMetadataManager)) {
             LOG.info("Obtaining delta updates from Ozone Manager");
-            // Get updates from OM and apply to local Recon OM DB.
+            // Get updates from OM and apply to local Recon OM DB and update 
task status in table
+            reconTaskUpdater.recordRunStart();
             getAndApplyDeltaUpdatesFromOM(currentSequenceNumber,
                 omdbUpdatesHandler);
-            // Update timestamp of successful delta updates query.
-            ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(
-                OmSnapshotTaskName.OmDeltaRequest.name(),
-                System.currentTimeMillis(), getCurrentOMDBSequenceNumber());
-            reconTaskStatusDao.update(reconTaskStatusRecord);
 
+            reconTaskUpdater.setLastTaskRunStatus(0);
+            
reconTaskUpdater.setLastUpdatedSeqNumber(getCurrentOMDBSequenceNumber());
             // Pass on DB update events to tasks that are listening.
             reconTaskController.consumeOMEvents(new OMUpdateEventBatch(
                 omdbUpdatesHandler.getEvents()), omMetadataManager);
+          } catch (IOException | RocksDBException e) {
+              LOG.error("Failed to get and apply delta updates with 
exception", e);
+              reconTaskUpdater.setLastTaskRunStatus(-1);
+              fullSnapshot = true;
           } catch (InterruptedException intEx) {
             Thread.currentThread().interrupt();
           } catch (Exception e) {
             metrics.incrNumDeltaRequestsFailed();
+            reconTaskUpdater.setLastTaskRunStatus(-1);
             LOG.warn("Unable to get and apply delta updates from OM.",
                 e.getMessage());
             fullSnapshot = true;
+          } finally {
+            // Update timestamp of successful delta updates query.
+            reconTaskUpdater.recordRunCompletion();

Review Comment:
   Ideally, we should not wait for downstream tasks to consumeOmEvents and get 
over to update the delta update task to be updated its status, delta update 
task status should be updated immediately after events fetched and applied 
after line 566.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconScmTask.java:
##########
@@ -100,5 +91,26 @@ public String getTaskName() {
     return getClass().getSimpleName();
   }
 
+  public ReconTaskStatusUpdater getTaskStatusUpdater() {
+    return this.taskStatusUpdater;
+  }
+
   protected abstract void run();
+
+  protected void initializeAndRunTask() {
+    try {
+      taskStatusUpdater.recordRunStart();
+      runTask();
+    } catch (Exception e) {

Review Comment:
   This catch is also reduntant. 



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -628,10 +617,13 @@ public boolean syncDataFromOM() {
             Thread.currentThread().interrupt();
           } catch (Exception e) {
             metrics.incrNumSnapshotRequestsFailed();
+            reconTaskUpdater.setLastTaskRunStatus(-1);
             LOG.error("Unable to update Recon's metadata with new OM DB. ", e);
             // Update health status in ReconContext
             reconContext.updateHealthStatus(new AtomicBoolean(false));
             
reconContext.updateErrors(ReconContext.ErrorCode.GET_OM_DB_SNAPSHOT_FAILED);
+          } finally {
+            reconTaskUpdater.recordRunCompletion();

Review Comment:
   Same comment here. Ideally, we should not wait for all downstream tasks to 
reprocess OM DB tables and get over to finish and then update the fullsnapshot 
task to be updated its status, fullsnapshot task status should be updated 
immediately after full OM DB fetched and applied after line 600.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -97,55 +95,46 @@ public void registerTask(ReconOmTask task) {
    * reprocess call more than 2 times across events, it is unregistered
    * (ignored).
    * @param events set of events
-   * @throws InterruptedException
    */
   @Override
-  public synchronized void consumeOMEvents(OMUpdateEventBatch events,
-                              OMMetadataManager omMetadataManager)
-      throws InterruptedException {
+  public synchronized void consumeOMEvents(OMUpdateEventBatch events, 
OMMetadataManager omMetadataManager) {

Review Comment:
   If this method is not throwing Interrupted exception as defined in 
`ReconTaskController` interface, then update the method definition in interface 
also.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -166,57 +155,61 @@ private void ignoreFailedTasks(List<String> failedTasks) {
   }
 
   @Override
-  public synchronized void reInitializeTasks(
-      ReconOMMetadataManager omMetadataManager) throws InterruptedException {
-    try {
-      Collection<Callable<Pair<String, Boolean>>> tasks = new ArrayList<>();
-      for (Map.Entry<String, ReconOmTask> taskEntry :
-          reconOmTasks.entrySet()) {
-        ReconOmTask task = taskEntry.getValue();
-        tasks.add(() -> task.reprocess(omMetadataManager));
-      }
-      List<Future<Pair<String, Boolean>>> results =
-          executorService.invokeAll(tasks);
-      for (Future<Pair<String, Boolean>> f : results) {
-        String taskName = f.get().getLeft();
-        if (!f.get().getRight()) {
-          LOG.info("Init failed for task {}.", taskName);
-        } else {
-          //store the timestamp for the task
-          ReconTaskStatus reconTaskStatusRecord = new ReconTaskStatus(taskName,
-              System.currentTimeMillis(),
-              omMetadataManager.getLastSequenceNumberFromDB());
-          reconTaskStatusDao.update(reconTaskStatusRecord);
-        }
-      }
-    } catch (ExecutionException e) {
-      LOG.error("Unexpected error : ", e);
+  public synchronized void reInitializeTasks(ReconOMMetadataManager 
omMetadataManager) {

Review Comment:
   If this method is not throwing Interrupted exception as defined in 
ReconTaskController interface, then update the method definition in interface 
also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to