ArafatKhan2198 commented on code in PR #9436:
URL: https://github.com/apache/ozone/pull/9436#discussion_r2596573523


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/updater/ReconTaskStatusUpdaterManager.java:
##########
@@ -17,39 +17,116 @@
 
 package org.apache.hadoop.ozone.recon.tasks.updater;
 
+import static org.jooq.impl.DSL.name;
+
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ozone.recon.schema.generated.tables.daos.ReconTaskStatusDao;
 import org.apache.ozone.recon.schema.generated.tables.pojos.ReconTaskStatus;
+import org.jooq.DSLContext;
+import org.jooq.impl.DSL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * This class provides caching for ReconTaskStatusUpdater instances.
  * For each task we maintain a map of updater instance and provide it to 
consumers
  * to update.
  * Here we also make a single call to the TASK_STATUS_TABLE to check if 
previous values are present
- * for a task in the DB to avoid overwrite to initial state
+ * for a task in the DB to avoid overwrite to initial state.
+ *
+ * Note: The initialization is lazy to avoid reading the database during Guice 
dependency injection,
+ * which would fail during upgrades when schema changes haven't been applied 
yet.
  */
 @Singleton
 public class ReconTaskStatusUpdaterManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReconTaskStatusUpdaterManager.class);
+  private static final String RECON_TASK_STATUS_TABLE_NAME = 
"RECON_TASK_STATUS";
+
   private final ReconTaskStatusDao reconTaskStatusDao;
-  // Act as a cache for the task updater instancesF
+  // Act as a cache for the task updater instances
   private final Map<String, ReconTaskStatusUpdater> updaterCache;
+  private AtomicBoolean initialized = new AtomicBoolean(false);
 
   @Inject
   public ReconTaskStatusUpdaterManager(
       ReconTaskStatusDao reconTaskStatusDao
   ) {
     this.reconTaskStatusDao = reconTaskStatusDao;
     this.updaterCache = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Lazy initialization - loads existing tasks from DB on first access.
+   * This ensures the DB schema is ready (after upgrades have run).
+   */
+  private synchronized void ensureInitialized() {
+    if (!initialized.get()) {
+      try {
+        LOG.info("Initializing ReconTaskStatusUpdaterManager - loading 
existing tasks from DB");
+
+        List<ReconTaskStatus> tasks;
+        DSLContext dsl = DSL.using(reconTaskStatusDao.configuration());
 
-    // Fetch the tasks present in the DB already
-    List<ReconTaskStatus> tasks = reconTaskStatusDao.findAll();
-    for (ReconTaskStatus task: tasks) {
-      updaterCache.put(task.getTaskName(),
-          new ReconTaskStatusUpdater(reconTaskStatusDao, task));
+        // Check if upgrade columns exist
+        if (columnExists(dsl, "LAST_TASK_RUN_STATUS")) {
+          // Schema is upgraded - use normal DAO with all columns
+          tasks = reconTaskStatusDao.findAll();
+          LOG.debug("Loaded tasks using full schema (all columns present)");
+        } else {
+          // Schema not upgraded yet - query only base columns that always 
exist
+          LOG.debug("Upgrade columns not present, querying base columns only");
+          tasks = dsl.select(
+                  DSL.field(name("task_name")),
+                  DSL.field(name("last_updated_timestamp")),
+                  DSL.field(name("last_updated_seq_number")))
+              .from(DSL.table(RECON_TASK_STATUS_TABLE_NAME))
+              .fetch(record -> new ReconTaskStatus(
+                  record.get(DSL.field(name("task_name")), String.class),
+                  record.get(DSL.field(name("last_updated_timestamp")), 
Long.class),
+                  record.get(DSL.field(name("last_updated_seq_number")), 
Long.class),
+                  0,  // Default for last_task_run_status
+                  0   // Default for is_current_task_running
+              ));
+        }
+
+        for (ReconTaskStatus task : tasks) {
+          updaterCache.put(task.getTaskName(),
+              new ReconTaskStatusUpdater(reconTaskStatusDao, task));
+        }
+
+        LOG.info("Loaded {} existing tasks from DB", tasks.size());
+        initialized.set(true);
+      } catch (Exception e) {
+        LOG.debug("Could not load tasks from DB yet, will retry: {}", 
e.getMessage());

Review Comment:
   added the stack trace thanks!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to