devmadhuu commented on code in PR #7723:
URL: https://github.com/apache/ozone/pull/7723#discussion_r1959969111
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryTask.java:
##########
@@ -80,45 +84,88 @@ public NSSummaryTask(ReconNamespaceSummaryManager
ozoneConfiguration) {
this.reconNamespaceSummaryManager = reconNamespaceSummaryManager;
this.reconOMMetadataManager = reconOMMetadataManager;
- this.ozoneConfiguration = ozoneConfiguration;
+ long nsSummaryFlushToDBMaxThreshold = ozoneConfiguration.getLong(
+ OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD,
+ OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT);
+
this.nsSummaryTaskWithFSO = new NSSummaryTaskWithFSO(
- reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconNamespaceSummaryManager, reconOMMetadataManager,
+ ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
this.nsSummaryTaskWithLegacy = new NSSummaryTaskWithLegacy(
- reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconNamespaceSummaryManager, reconOMMetadataManager,
+ ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
this.nsSummaryTaskWithOBS = new NSSummaryTaskWithOBS(
- reconNamespaceSummaryManager,
- reconOMMetadataManager, ozoneConfiguration);
+ reconNamespaceSummaryManager, reconOMMetadataManager,
+ ozoneConfiguration, nsSummaryFlushToDBMaxThreshold);
}
@Override
public String getTaskName() {
return "NSSummaryTask";
}
+ /**
+ * Bucket Type Enum which mimic subtasks for their data processing.
+ */
+ public enum BucketType {
+ FSO("File System Optimized Bucket"),
+ OBS("Object Store Bucket"),
+ LEGACY("Legacy Bucket");
+
+ private final String description;
+
+ BucketType(String description) {
+ this.description = description;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+ }
+
@Override
- public Pair<String, Boolean> process(OMUpdateEventBatch events) {
- long startTime = System.currentTimeMillis();
- boolean success = nsSummaryTaskWithFSO.processWithFSO(events);
- if (!success) {
+ public TaskResult process(
+ OMUpdateEventBatch events, Map<String, Integer> subTaskSeekPosMap) {
+ boolean anyFailure = false; // Track if any bucket fails
+ Map<String, Integer> updatedSeekPositions = new HashMap<>();
+
+ // Process FSO bucket
+ Integer bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.FSO.name(),
0);
+ Pair<Integer, Boolean> bucketResult =
nsSummaryTaskWithFSO.processWithFSO(events, bucketSeek);
+ updatedSeekPositions.put(BucketType.FSO.name(), bucketResult.getLeft());
+ if (!bucketResult.getRight()) {
LOG.error("processWithFSO failed.");
+ anyFailure = true;
}
- success = nsSummaryTaskWithLegacy.processWithLegacy(events);
- if (!success) {
+
+ // Process Legacy bucket
+ bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.LEGACY.name(), 0);
+ bucketResult = nsSummaryTaskWithLegacy.processWithLegacy(events,
bucketSeek);
+ updatedSeekPositions.put(BucketType.LEGACY.name(), bucketResult.getLeft());
+ if (!bucketResult.getRight()) {
LOG.error("processWithLegacy failed.");
+ anyFailure = true;
}
- success = nsSummaryTaskWithOBS.processWithOBS(events);
- if (!success) {
+
+ // Process OBS bucket
+ bucketSeek = subTaskSeekPosMap.getOrDefault(BucketType.OBS.name(), 0);
+ bucketResult = nsSummaryTaskWithOBS.processWithOBS(events, bucketSeek);
+ updatedSeekPositions.put(BucketType.OBS.name(), bucketResult.getLeft());
+ if (!bucketResult.getRight()) {
LOG.error("processWithOBS failed.");
+ anyFailure = true;
}
- LOG.debug("{} successfully processed in {} milliseconds",
- getTaskName(), (System.currentTimeMillis() - startTime));
- return new ImmutablePair<>(getTaskName(), success);
+
+ // Return task failure if any bucket failed, while keeping each bucket's
latest seek position
+ return new TaskResult.Builder()
Review Comment:
Yes that can be done, but since we are maintaining the last seek position
for each sub-task (here it is bucket type), so even if first sub-task fails, we
want to run next sub-task and preserve its seek position in this task iteration
itself and when process() call will happen again because of failure of first
sub-task , then only first sub-task execution only will happen and even though
we are calling next sub-task subsequently, seek position of those would have
already reached till end of event iterator, so it will just check and come out
and will not proceed further. Hence it is better to try running all sub-tasks
and will not make any difference if we do fail-fast or try running all
sub-tasks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]