devabhishekpal commented on code in PR #7723:
URL: https://github.com/apache/ozone/pull/7723#discussion_r1959941268


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java:
##########
@@ -80,45 +84,88 @@ public NSSummaryTask(ReconNamespaceSummaryManager
                        ozoneConfiguration) {
     this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
     this.reconOMMetadataManager = reconOMMetadataManager;
-    this.ozoneConfiguration = ozoneConfiguration;
+    long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong(
+        OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
+        OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT);
+
     this.nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO(
-        reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconNamespaceSummaryManager, reconOMMetadataManager,
+        ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
     this.nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
-        reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconNamespaceSummaryManager, reconOMMetadataManager,
+        ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
     this.nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS(
-        reconNamespaceSummaryManager,
-        reconOMMetadataManager, ozoneConfiguration);
+        reconNamespaceSummaryManager, reconOMMetadataManager,
+        ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
   }
 
   @Override
   public String getTaskName() {
     return "NSSummaryTask";
   }
 
+  /**
+   * Bucket Type Enum which mimic subtasks for their data processing.
+   */
+  public enum BucketType {
+    FSO("File System Optimized Bucket"),
+    OBS("Object Store Bucket"),
+    LEGACY("Legacy Bucket");
+
+    private final String description;
+
+    BucketType(String description) {
+      this.description = description;
+    }
+
+    public String getDescription() {
+      return description;
+    }
+  }
+
   @Override
-  public Pair<String, Boolean> process(OMUpdateEventBatch events) {
-    long startTime = System.currentTimeMillis();
-    boolean success = nsSummaryTaskWithFSO.processWithFSO(events);
-    if (!success) {
+  public TaskResult process(
+      OMUpdateEventBatch events, Map<String, Integer> subTaskSeekPosMap) {
+    boolean anyFailure = false; // Track if any bucket fails
+    Map<String, Integer> updatedSeekPositions = new HashMap<>();
+
+    // Process FSO bucket
+    Integer bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(), 
0);
+    Pair<Integer, Boolean> bucketResult = 
nsSummaryTaskWithFSO.processWithFSO(events, bucketSeek);
+    updatedSeekPositions.put(BucketType.FSO.name(), bucketResult.getLeft());
+    if (!bucketResult.getRight()) {
       LOG.error("processWithFSO failed.");
+      anyFailure = true;
     }
-    success = nsSummaryTaskWithLegacy.processWithLegacy(events);
-    if (!success) {
+
+    // Process Legacy bucket
+    bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0);
+    bucketResult = nsSummaryTaskWithLegacy.processWithLegacy(events, 
bucketSeek);
+    updatedSeekPositions.put(BucketType.LEGACY.name(), bucketResult.getLeft());
+    if (!bucketResult.getRight()) {
       LOG.error("processWithLegacy failed.");
+      anyFailure = true;
     }
-    success = nsSummaryTaskWithOBS.processWithOBS(events);
-    if (!success) {
+
+    // Process OBS bucket
+    bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);
+    bucketResult = nsSummaryTaskWithOBS.processWithOBS(events, bucketSeek);
+    updatedSeekPositions.put(BucketType.OBS.name(), bucketResult.getLeft());
+    if (!bucketResult.getRight()) {
       LOG.error("processWithOBS failed.");
+      anyFailure = true;
     }
-    LOG.debug("{} successfully processed in {} milliseconds",
-        getTaskName(), (System.currentTimeMillis() - startTime));
-    return new ImmutablePair<>(getTaskName(), success);
+
+    // Return task failure if any bucket failed, while keeping each bucket's 
latest seek position
+    return new TaskResult.Builder()

Review Comment:
   In case any sub-process fails the whole task is marked as failure wouldn't 
it be faster to return the task as failure at the point any one of the 
sub-processing fails instead of waiting all three types of bucket processing to 
be completed?
   Task Controller would trigger another process() call if the task fails 
anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to