devmadhuu commented on code in PR #9436:
URL: https://github.com/apache/ozone/pull/9436#discussion_r2593246402


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdaterManager.java:
##########
@@ -17,39 +17,116 @@
 
 package org.apache.hadoop.ozone.recon.tasks.updater;
 
+import static org.jooq.impl.DSL.name;
+
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao;
 import org.apache.ozone.recon.schema.generated.tables.pojos.ReconTaskStatus;
+import org.jooq.DSLContext;
+import org.jooq.impl.DSL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * This class provides caching for ReconTaskStatusUpdater instances.
  * For each task we maintain a map of updater instance and provide it to 
consumers
  * to update.
  * Here we also make a single call to the TASK_STATUS_TABLE to check if 
previous values are present
- * for a task in the DB to avoid overwrite to initial state
+ * for a task in the DB to avoid overwrite to initial state.
+ *
+ * Note: The initialization is lazy to avoid reading the database during Guice 
dependency injection,
+ * which would fail during upgrades when schema changes haven't been applied 
yet.
  */
 @Singleton
 public class ReconTaskStatusUpdaterManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReconTaskStatusUpdaterManager.class);
+  private static final String RECON_TASK_STATUS_TABLE_NAME = 
"RECON_TASK_STATUS";
+
   private final ReconTaskStatusDao reconTaskStatusDao;
-  // Act as a cache for the task updater instancesF
+  // Act as a cache for the task updater instances
   private final Map<String, ReconTaskStatusUpdater> updaterCache;
+  private AtomicBoolean initialized = new AtomicBoolean(false);
 
   @Inject
   public ReconTaskStatusUpdaterManager(
       ReconTaskStatusDao reconTaskStatusDao
   ) {
     this.reconTaskStatusDao = reconTaskStatusDao;
     this.updaterCache = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Lazy initialization - loads existing tasks from DB on first access.
+   * This ensures the DB schema is ready (after upgrades have run).
+   */
+  private synchronized void ensureInitialized() {
+    if (!initialized.get()) {

Review Comment:
   You can implement double lock checking for performance optimization, because 
every call acquires and releases a lock, even though initialization happened 
once by first thread who enters this method and first time not initialized. So 
something like this is better implementation instead of declaring whole method 
as synchronized:
   
    ```
   private void ensureInitialized() {
       if (!initialized) {  // ← FIRST CHECK (no lock, fast!)
         synchronized (this) {  // ← Lock ONLY if not initialized
           if (!initialized) {  // ← SECOND CHECK (after lock)
             // ... initialization ...
             initialized = true;
           }
         }
       }
     }
   ```
   
   This will make sure that all threads who want to acquire the lock because to 
check if initialized or not , they will not wait and here contention will not 
be there.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java:
##########
@@ -272,6 +272,7 @@ public void start() {
         throw new RuntimeException(runtimeException);
       }
     }
+    reconTaskController.updateOMMetadataManager(omMetadataManager);

Review Comment:
   Why this line needed here to add ? ReconTaskController start is handling 
null.



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdaterManager.java:
##########
@@ -17,39 +17,116 @@
 
 package org.apache.hadoop.ozone.recon.tasks.updater;
 
+import static org.jooq.impl.DSL.name;
+
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao;
 import org.apache.ozone.recon.schema.generated.tables.pojos.ReconTaskStatus;
+import org.jooq.DSLContext;
+import org.jooq.impl.DSL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * This class provides caching for ReconTaskStatusUpdater instances.
  * For each task we maintain a map of updater instance and provide it to 
consumers
  * to update.
  * Here we also make a single call to the TASK_STATUS_TABLE to check if 
previous values are present
- * for a task in the DB to avoid overwrite to initial state
+ * for a task in the DB to avoid overwrite to initial state.
+ *
+ * Note: The initialization is lazy to avoid reading the database during Guice 
dependency injection,
+ * which would fail during upgrades when schema changes haven't been applied 
yet.
  */
 @Singleton
 public class ReconTaskStatusUpdaterManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReconTaskStatusUpdaterManager.class);
+  private static final String RECON_TASK_STATUS_TABLE_NAME = 
"RECON_TASK_STATUS";
+
   private final ReconTaskStatusDao reconTaskStatusDao;
-  // Act as a cache for the task updater instancesF
+  // Act as a cache for the task updater instances
   private final Map<String, ReconTaskStatusUpdater> updaterCache;
+  private AtomicBoolean initialized = new AtomicBoolean(false);
 
   @Inject
   public ReconTaskStatusUpdaterManager(
       ReconTaskStatusDao reconTaskStatusDao
   ) {
     this.reconTaskStatusDao = reconTaskStatusDao;
     this.updaterCache = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Lazy initialization - loads existing tasks from DB on first access.
+   * This ensures the DB schema is ready (after upgrades have run).
+   */
+  private synchronized void ensureInitialized() {
+    if (!initialized.get()) {
+      try {
+        LOG.info("Initializing ReconTaskStatusUpdaterManager - loading 
existing tasks from DB");
+
+        List<ReconTaskStatus> tasks;
+        DSLContext dsl = DSL.using(reconTaskStatusDao.configuration());
 
-    // Fetch the tasks present in the DB already
-    List<ReconTaskStatus> tasks = reconTaskStatusDao.findAll();
-    for (ReconTaskStatus task: tasks) {
-      updaterCache.put(task.getTaskName(),
-          new ReconTaskStatusUpdater(reconTaskStatusDao, task));
+        // Check if upgrade columns exist
+        if (columnExists(dsl, "LAST_TASK_RUN_STATUS")) {
+          // Schema is upgraded - use normal DAO with all columns
+          tasks = reconTaskStatusDao.findAll();
+          LOG.debug("Loaded tasks using full schema (all columns present)");
+        } else {
+          // Schema not upgraded yet - query only base columns that always 
exist
+          LOG.debug("Upgrade columns not present, querying base columns only");
+          tasks = dsl.select(
+                  DSL.field(name("task_name")),
+                  DSL.field(name("last_updated_timestamp")),
+                  DSL.field(name("last_updated_seq_number")))
+              .from(DSL.table(RECON_TASK_STATUS_TABLE_NAME))
+              .fetch(record -> new ReconTaskStatus(
+                  record.get(DSL.field(name("task_name")), String.class),
+                  record.get(DSL.field(name("last_updated_timestamp")), 
Long.class),
+                  record.get(DSL.field(name("last_updated_seq_number")), 
Long.class),
+                  0,  // Default for last_task_run_status
+                  0   // Default for is_current_task_running
+              ));
+        }
+
+        for (ReconTaskStatus task : tasks) {
+          updaterCache.put(task.getTaskName(),
+              new ReconTaskStatusUpdater(reconTaskStatusDao, task));
+        }
+
+        LOG.info("Loaded {} existing tasks from DB", tasks.size());
+        initialized.set(true);
+      } catch (Exception e) {
+        LOG.debug("Could not load tasks from DB yet, will retry: {}", 
e.getMessage());

Review Comment:
   What will happen if exception gets caught here ? It will be silent failure...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to