devmadhuu commented on code in PR #9416:
URL: https://github.com/apache/ozone/pull/9416#discussion_r2592448284
##########
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java:
##########
@@ -183,215 +264,300 @@ void testInitialState() {
}
/**
- * Test successful single rebuild operation.
+ * Test single successful rebuild via queue.
*/
@Test
void testSingleSuccessfulRebuild() throws Exception {
- // Setup successful rebuild
- // Setup successful rebuild by default - no exception thrown
- doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
+ AtomicBoolean rebuildExecuted = new AtomicBoolean(false);
+ CountDownLatch rebuildLatch = new CountDownLatch(1);
- // Execute rebuild
- TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
+ doAnswer(invocation -> {
+ rebuildExecuted.set(true);
+ rebuildLatch.countDown();
+ return null;
+ }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
- // Verify results
- assertTrue(result.isTaskSuccess(), "Rebuild should succeed");
- assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
- "State should return to IDLE after successful rebuild");
-
- // Verify interactions
- verify(mockNamespaceSummaryManager, times(1)).clearNSSummaryTable();
- }
+ // Queue rebuild via production API
+ ReconTaskController.ReInitializationResult result =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
- /**
- * Test rebuild failure sets state to FAILED.
- */
- @Test
- void testRebuildFailure() throws IOException {
- // Setup failure scenario
- doThrow(new IOException("Test
failure")).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result,
+ "Rebuild should be queued successfully");
- // Execute rebuild
- TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
+ // Wait for async processing
+ assertTrue(rebuildLatch.await(10, TimeUnit.SECONDS),
+ "Rebuild should execute");
+ assertTrue(rebuildExecuted.get(), "Rebuild should have executed");
- // Verify results
- assertFalse(result.isTaskSuccess(), "Rebuild should fail");
- assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
- "State should be FAILED after rebuild failure");
+ // Allow time for state to return to IDLE
+ Thread.sleep(500);
+ assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
+ "State should return to IDLE after successful rebuild");
}
/**
- * Test concurrent rebuild attempts - second call should be rejected.
+ * Test rebuild failure sets proper state.
*/
@Test
- void testConcurrentRebuildPrevention() throws Exception {
- CountDownLatch startLatch = new CountDownLatch(1);
- CountDownLatch finishLatch = new CountDownLatch(1);
- AtomicBoolean firstRebuildStarted = new AtomicBoolean(false);
- AtomicBoolean secondRebuildRejected = new AtomicBoolean(false);
+ void testRebuildFailure() throws Exception {
+ CountDownLatch failureLatch = new CountDownLatch(1);
- // Setup first rebuild to block until we signal
doAnswer(invocation -> {
- firstRebuildStarted.set(true);
- startLatch.countDown();
- // Wait for test to signal completion
- boolean awaitSuccess = finishLatch.await(10, TimeUnit.SECONDS);
- if (!awaitSuccess) {
- LOG.warn("finishLatch.await() timed out");
- }
- return null;
+ failureLatch.countDown();
+ throw new IOException("Test failure");
}).when(mockNamespaceSummaryManager).clearNSSummaryTable();
- ExecutorService executor = Executors.newFixedThreadPool(2);
+ ReconTaskController.ReInitializationResult result =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
- try {
- // Start first rebuild asynchronously
- CompletableFuture<TaskResult> firstRebuild =
CompletableFuture.supplyAsync(() -> {
- LOG.info("Starting first rebuild");
- return nsSummaryTask.reprocess(mockOMMetadataManager);
- }, executor);
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result,
+ "Rebuild should be queued successfully");
- // Wait for first rebuild to start
- assertTrue(startLatch.await(5, TimeUnit.SECONDS),
- "First rebuild should start within timeout");
- assertTrue(firstRebuildStarted.get(), "First rebuild should have
started");
- assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
- "State should be RUNNING during first rebuild");
-
- // Attempt second rebuild - should be rejected immediately
- CompletableFuture<TaskResult> secondRebuild =
CompletableFuture.supplyAsync(() -> {
- LOG.info("Attempting second rebuild");
- TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
- secondRebuildRejected.set(!result.isTaskSuccess());
- return result;
- }, executor);
-
- // Get second rebuild result quickly (should be immediate rejection)
- TaskResult secondResult = secondRebuild.get(2, TimeUnit.SECONDS);
- assertFalse(secondResult.isTaskSuccess(),
- "Second rebuild should be rejected");
- assertTrue(secondRebuildRejected.get(), "Second rebuild should have been
rejected");
-
- // Signal first rebuild to complete
- finishLatch.countDown();
- TaskResult firstResult = firstRebuild.get(5, TimeUnit.SECONDS);
- assertTrue(firstResult.isTaskSuccess(), "First rebuild should succeed");
-
- // Verify final state
- assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
- "State should return to IDLE after first rebuild completes");
+ assertTrue(failureLatch.await(10, TimeUnit.SECONDS),
+ "Rebuild should be attempted");
- } finally {
- finishLatch.countDown(); // Ensure cleanup
- executor.shutdown();
- executor.awaitTermination(5, TimeUnit.SECONDS);
- }
+ // Allow time for state update
+ Thread.sleep(500);
+ assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
+ "State should be FAILED after rebuild failure");
}
/**
- * Test that rebuild can be triggered again after failure.
+ * Test rebuild can be triggered again after failure.
*/
@Test
void testRebuildAfterFailure() throws Exception {
+ CountDownLatch firstAttempt = new CountDownLatch(1);
+ CountDownLatch secondAttempt = new CountDownLatch(1);
+ AtomicInteger attemptCount = new AtomicInteger(0);
+
+ // Setup mock to fail first time, succeed second time
+ doAnswer(invocation -> {
+ int attempt = attemptCount.incrementAndGet();
+ LOG.info("clearNSSummaryTable attempt #{}", attempt);
+ if (attempt == 1) {
+ firstAttempt.countDown();
+ throw new IOException("First failure");
+ } else {
+ secondAttempt.countDown();
+ return null;
+ }
+ }).when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
// First rebuild fails
- doThrow(new IOException("Test
failure")).when(mockNamespaceSummaryManager).clearNSSummaryTable();
-
- TaskResult failedResult = nsSummaryTask.reprocess(mockOMMetadataManager);
- assertFalse(failedResult.isTaskSuccess(), "First rebuild should fail");
+ ReconTaskController.ReInitializationResult result1 =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result1,
+ "First event should be queued successfully");
+
+ assertTrue(firstAttempt.await(10, TimeUnit.SECONDS), "First rebuild should
be attempted");
+ Thread.sleep(1000);
assertEquals(RebuildState.FAILED, NSSummaryTask.getRebuildState(),
"State should be FAILED after first rebuild");
- // Second rebuild succeeds
- // Setup successful rebuild by default - no exception thrown
- doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
-
- TaskResult successResult = nsSummaryTask.reprocess(mockOMMetadataManager);
- assertTrue(successResult.isTaskSuccess(), "Second rebuild should succeed");
+ // Second rebuild succeeds - wait for retry delay (2 seconds)
+ Thread.sleep(2100); // Wait for RETRY_DELAY_MS (2000ms) + buffer
+
+ ReconTaskController.ReInitializationResult result2 =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+ assertEquals(ReconTaskController.ReInitializationResult.SUCCESS, result2,
+ "Second event should be queued successfully after retry delay");
+
+ assertTrue(secondAttempt.await(10, TimeUnit.SECONDS), "Second rebuild
should be attempted");
+ Thread.sleep(1000);
assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
"State should be IDLE after successful rebuild");
}
/**
- * Test multiple concurrent attempts - only one should succeed, others
rejected.
+ * Test multiple concurrent queueReInitializationEvent() calls.
+ *
+ * This is the KEY test for production behavior - multiple threads
+ * simultaneously calling queueReInitializationEvent(), which is what
+ * actually happens in production (not direct reprocess() calls).
+ *
+ * <p>Important: The queue-based architecture provides SEQUENTIAL processing,
+ * not event deduplication. Multiple successfully queued events will execute
+ * sequentially (not concurrently). The AtomicReference in NSSummaryTask
+ * prevents concurrent execution within a single reprocess() call.
*/
@Test
- @Flaky("HDDS-13573")
+ @SuppressWarnings("methodLength")
void testMultipleConcurrentAttempts() throws Exception {
int threadCount = 5;
- CountDownLatch startLatch = new CountDownLatch(1);
- CountDownLatch finishLatch = new CountDownLatch(1);
- AtomicInteger successCount = new AtomicInteger(0);
- AtomicInteger rejectedCount = new AtomicInteger(0);
+ CountDownLatch allThreadsReady = new CountDownLatch(threadCount);
+ CountDownLatch firstRebuildStarted = new CountDownLatch(1);
+ CountDownLatch firstRebuildCanComplete = new CountDownLatch(1);
AtomicInteger clearTableCallCount = new AtomicInteger(0);
+ AtomicInteger concurrentExecutions = new AtomicInteger(0);
+ AtomicInteger maxConcurrentExecutions = new AtomicInteger(0);
+ AtomicInteger successfulQueueCount = new AtomicInteger(0);
+ AtomicInteger totalQueueAttempts = new AtomicInteger(0);
// Ensure clean initial state
NSSummaryTask.resetRebuildState();
- assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
+ assertEquals(RebuildState.IDLE, NSSummaryTask.getRebuildState(),
"Initial state must be IDLE");
- // Setup rebuild to block and count calls
+ // Setup rebuild to track concurrent executions
doAnswer(invocation -> {
int callNum = clearTableCallCount.incrementAndGet();
- LOG.info("clearNSSummaryTable called #{}, current state: {}", callNum,
NSSummaryTask.getRebuildState());
-
- if (callNum == 1) {
- startLatch.countDown();
- boolean awaitSuccess = finishLatch.await(10, TimeUnit.SECONDS);
- if (!awaitSuccess) {
- LOG.warn("finishLatch.await() timed out");
+ int currentConcurrent = concurrentExecutions.incrementAndGet();
+
+ // Track max concurrent executions
+ maxConcurrentExecutions.updateAndGet(max -> Math.max(max,
currentConcurrent));
+
+ LOG.info("clearNSSummaryTable call #{}, concurrent executions: {},
state: {}",
+ callNum, currentConcurrent, NSSummaryTask.getRebuildState());
+
+ try {
+ if (callNum == 1) {
+ // First call - block to allow other threads to queue
+ firstRebuildStarted.countDown();
+ boolean awaitSuccess = firstRebuildCanComplete.await(15,
TimeUnit.SECONDS);
+ if (!awaitSuccess) {
+ LOG.error("firstRebuildCanComplete.await() timed out");
+ }
+ } else {
+ // Subsequent calls - execute quickly
+ Thread.sleep(100);
}
+ } finally {
+ concurrentExecutions.decrementAndGet();
}
return null;
}).when(mockNamespaceSummaryManager).clearNSSummaryTable();
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
- CompletableFuture<Void>[] futures = new CompletableFuture[threadCount];
+ List<CompletableFuture<ReconTaskController.ReInitializationResult>>
futures = new ArrayList<>();
try {
- // Launch multiple concurrent rebuilds
+ // Launch multiple concurrent queue requests
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
- futures[i] = CompletableFuture.runAsync(() -> {
- LOG.info("Thread {} attempting rebuild, current state: {}",
threadId, NSSummaryTask.getRebuildState());
- TaskResult result = nsSummaryTask.reprocess(mockOMMetadataManager);
- if (result.isTaskSuccess()) {
- int count = successCount.incrementAndGet();
- LOG.info("Thread {} rebuild succeeded (success #{})", threadId,
count);
- } else {
- int count = rejectedCount.incrementAndGet();
- LOG.info("Thread {} rebuild rejected (rejection #{})", threadId,
count);
- }
- }, executor);
+ CompletableFuture<ReconTaskController.ReInitializationResult> future =
+ CompletableFuture.supplyAsync(() -> {
+ try {
+ // Signal thread is ready
+ allThreadsReady.countDown();
+ LOG.info("Thread {} ready, waiting for all threads", threadId);
+
+ // Wait for all threads to be ready
+ if (!allThreadsReady.await(10, TimeUnit.SECONDS)) {
+ throw new RuntimeException("Not all threads ready in time");
+ }
+
+ // Small staggered delay to create realistic race conditions
+ Thread.sleep(threadId * 10L);
+
+ LOG.info("Thread {} calling queueReInitializationEvent()",
threadId);
+ totalQueueAttempts.incrementAndGet();
+
+ ReconTaskController.ReInitializationResult result =
+ taskController.queueReInitializationEvent(
+
ReconTaskReInitializationEvent.ReInitializationReason.MANUAL_TRIGGER);
+
+ if (result ==
ReconTaskController.ReInitializationResult.SUCCESS) {
+ successfulQueueCount.incrementAndGet();
+ }
+
+ LOG.info("Thread {} completed with result={}", threadId,
result);
+ return result;
+
+ } catch (InterruptedException e) {
+ LOG.error("Thread {} interrupted", threadId, e);
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }, executor);
+ futures.add(future);
}
// Wait for first rebuild to start
- assertTrue(startLatch.await(5, TimeUnit.SECONDS),
- "At least one rebuild should start");
- assertEquals(RebuildState.RUNNING, NSSummaryTask.getRebuildState(),
- "State should be RUNNING");
-
- // Let rebuilds complete
- finishLatch.countDown();
- CompletableFuture.allOf(futures).get(10, TimeUnit.SECONDS);
+ assertTrue(firstRebuildStarted.await(15, TimeUnit.SECONDS),
+ "First rebuild should start");
+ LOG.info("First rebuild started, state: {}",
NSSummaryTask.getRebuildState());
+
+ // Give time for other threads to attempt queueing while first rebuild
is running
+ Thread.sleep(1000);
+
+ // Signal first rebuild can complete
+ firstRebuildCanComplete.countDown();
+
+ // Wait for all threads to complete queueing
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+ .get(20, TimeUnit.SECONDS);
+
+ // Allow time for all queued events to be processed
+ Thread.sleep(3000);
+
+ // Collect and analyze results
+ long successResultCount = 0;
+ long retryLaterCount = 0;
+ long maxRetriesCount = 0;
+
+ for (CompletableFuture<ReconTaskController.ReInitializationResult>
future : futures) {
+ ReconTaskController.ReInitializationResult result = future.get();
+ switch (result) {
+ case SUCCESS:
+ successResultCount++;
+ break;
+ case RETRY_LATER:
+ retryLaterCount++;
+ break;
+ case MAX_RETRIES_EXCEEDED:
+ maxRetriesCount++;
+ break;
+ default:
+ LOG.warn("Unexpected result: {}", result);
+ }
+ }
// Debug output
- LOG.info("Final counts - Success: {}, Rejected: {}, ClearTable calls:
{}, Final state: {}",
- successCount.get(), rejectedCount.get(), clearTableCallCount.get(),
NSSummaryTask.getRebuildState());
-
- // Verify results - only one thread should have successfully executed
the rebuild
- assertEquals(1, clearTableCallCount.get(),
- "clearNSSummaryTable should only be called once due to unified
control");
- assertEquals(1, successCount.get(),
- "Exactly one rebuild should succeed");
- assertEquals(threadCount - 1, rejectedCount.get(),
- "All other rebuilds should be rejected");
+ LOG.info("Test completed - Total queue attempts: {}, Successful queues:
{}, " +
+ "Result breakdown: SUCCESS={}, RETRY_LATER={}, MAX_RETRIES={}, "
+
+ "ClearTable calls: {}, Max concurrent: {}, Final state: {}",
+ totalQueueAttempts.get(), successfulQueueCount.get(),
+ successResultCount, retryLaterCount, maxRetriesCount,
+ clearTableCallCount.get(), maxConcurrentExecutions.get(),
+ NSSummaryTask.getRebuildState());
+
+ // CRITICAL INVARIANT: No concurrent executions
+ // The queue + async processing ensures sequential (not concurrent)
execution
+ assertEquals(1, maxConcurrentExecutions.get(),
+ "Should never have concurrent executions - queue provides
serialization");
+
+ // All threads should have attempted to queue
+ assertEquals(threadCount, totalQueueAttempts.get(),
+ "All threads should have attempted to queue events");
+
+ // At least one thread should have successfully queued
+ assertTrue(successfulQueueCount.get() >= 1,
+ "At least one thread should have successfully queued rebuild");
+
+ // Multiple events may be queued and executed sequentially
+ assertTrue(clearTableCallCount.get() >= 1,
+ "At least one rebuild should execute");
+ assertTrue(clearTableCallCount.get() <= successfulQueueCount.get(),
+ "Number of executions should not exceed successfully queued events");
Review Comment:
done
##########
hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestNSSummaryUnifiedControl.java:
##########
@@ -44,100 +51,176 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconContainerMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconFileMetadataManager;
+import org.apache.hadoop.ozone.recon.spi.ReconGlobalStatsManager;
import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager;
+import org.apache.hadoop.ozone.recon.spi.impl.ReconDBProvider;
import org.apache.hadoop.ozone.recon.tasks.NSSummaryTask.RebuildState;
-import org.apache.hadoop.ozone.recon.tasks.ReconOmTask.TaskResult;
-import org.apache.ozone.test.tag.Flaky;
+import org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdater;
+import
org.apache.hadoop.ozone.recon.tasks.updater.ReconTaskStatusUpdaterManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Integration tests for HDDS-13443: Unified and controlled sync access to
- * retrigger of build of NSSummary tree.
- *
- * <p>These tests verify that the unified control mechanism prevents concurrent
- * rebuilds and properly manages state transitions across all entry points.
+ * Unified and controlled sync access to
+ * retrigger of build of NSSummary tree using queue-based architecture.
+ *
+ * <p>These tests verify that the queue-based unified control mechanism
+ * correctly handles concurrent queueReInitializationEvent() calls in
production.
+ *
+ * <p>Execution Flow:
+ * <pre>
+ * Multiple Concurrent Callers
+ * ↓ ↓ ↓
+ * queueReInitializationEvent() [Thread-safe public API]
+ * ↓
+ * BlockingQueue<ReconEvent> [Serialization layer]
+ * ↓
+ * Single Async Thread [Sequential processing]
+ * ↓
+ * processReInitializationEvent()
+ * ↓
+ * reInitializeTasks()
+ * ↓
+ * task.reprocess() [Only ONE execution at a time]
+ * </pre>
*/
+@SuppressWarnings("PMD.SingularField") // nsSummaryTask used via
taskController across all tests
public class TestNSSummaryUnifiedControl {
private static final Logger LOG =
LoggerFactory.getLogger(TestNSSummaryUnifiedControl.class);
+ private ReconTaskControllerImpl taskController;
private NSSummaryTask nsSummaryTask;
private ReconNamespaceSummaryManager mockNamespaceSummaryManager;
private ReconOMMetadataManager mockReconOMMetadataManager;
- private OMMetadataManager mockOMMetadataManager;
private OzoneConfiguration ozoneConfiguration;
@BeforeEach
- void setUp() throws IOException {
+ void setUp() throws Exception {
// Reset static state before each test
NSSummaryTask.resetRebuildState();
-
+
// Create mocks
mockNamespaceSummaryManager = mock(ReconNamespaceSummaryManager.class);
mockReconOMMetadataManager = mock(ReconOMMetadataManager.class);
- mockOMMetadataManager = mock(OMMetadataManager.class);
ozoneConfiguration = new OzoneConfiguration();
- // Create NSSummaryTask instance that will use mocked sub-tasks
+ // Configure small buffer for easier testing
+
ozoneConfiguration.setInt(ReconServerConfigKeys.OZONE_RECON_OM_EVENT_BUFFER_CAPACITY,
100);
+
+ // Create testable NSSummaryTask instance
nsSummaryTask = createTestableNSSummaryTask();
+
+ // Setup task controller
+ ReconTaskStatusUpdaterManager mockTaskStatusUpdaterManager =
mock(ReconTaskStatusUpdaterManager.class);
+ ReconTaskStatusUpdater mockTaskStatusUpdater =
mock(ReconTaskStatusUpdater.class);
+
when(mockTaskStatusUpdaterManager.getTaskStatusUpdater(any())).thenReturn(mockTaskStatusUpdater);
+
+ ReconDBProvider reconDbProvider = mock(ReconDBProvider.class);
+ when(reconDbProvider.getDbStore()).thenReturn(mock(DBStore.class));
+
when(reconDbProvider.getStagedReconDBProvider()).thenReturn(reconDbProvider);
+
+ ReconContainerMetadataManager reconContainerMgr =
mock(ReconContainerMetadataManager.class);
+ ReconGlobalStatsManager reconGlobalStatsManager =
mock(ReconGlobalStatsManager.class);
+ ReconFileMetadataManager reconFileMetadataManager =
mock(ReconFileMetadataManager.class);
+
+ taskController = new ReconTaskControllerImpl(ozoneConfiguration, new
HashSet<>(),
+ mockTaskStatusUpdaterManager, reconDbProvider, reconContainerMgr,
mockNamespaceSummaryManager,
+ reconGlobalStatsManager, reconFileMetadataManager);
+
+ taskController.registerTask(nsSummaryTask);
+
+ // Setup mock OM metadata manager with checkpoint support
+ setupMockOMMetadataManager();
+ taskController.updateOMMetadataManager(mockReconOMMetadataManager);
+
+ // Setup successful rebuild by default
+ doNothing().when(mockNamespaceSummaryManager).clearNSSummaryTable();
+
+ // Start async processing
+ taskController.start();
}
@AfterEach
void tearDown() {
- // Reset static state after each test to ensure test isolation
+ // Reset static state after each test
NSSummaryTask.resetRebuildState();
+
+ // Shutdown task controller
+ if (taskController != null) {
+ taskController.stop();
+ }
}
-
- /**
- * Create a testable NSSummaryTask that uses mocked sub-tasks for successful
execution.
- */
+
+ private void setupMockOMMetadataManager() throws IOException {
+ DBStore mockDBStore = mock(DBStore.class);
+ File mockDbLocation = mock(File.class);
+ DBCheckpoint mockCheckpoint = mock(DBCheckpoint.class);
+ Path mockCheckpointPath = Paths.get("/tmp/test/checkpoint");
+
+ when(mockReconOMMetadataManager.getStore()).thenReturn(mockDBStore);
+ when(mockDBStore.getDbLocation()).thenReturn(mockDbLocation);
+ when(mockDbLocation.getParent()).thenReturn("/tmp/test");
+ when(mockDBStore.getCheckpoint(anyString(),
any(Boolean.class))).thenReturn(mockCheckpoint);
+
when(mockCheckpoint.getCheckpointLocation()).thenReturn(mockCheckpointPath);
+
+ ReconOMMetadataManager mockCheckpointedManager =
mock(ReconOMMetadataManager.class);
+ when(mockCheckpointedManager.getStore()).thenReturn(mockDBStore);
+
when(mockReconOMMetadataManager.createCheckpointReconMetadataManager(any(),
any()))
+ .thenReturn(mockCheckpointedManager);
+ }
+
private NSSummaryTask createTestableNSSummaryTask() {
return new NSSummaryTask(
- mockNamespaceSummaryManager,
- mockReconOMMetadataManager,
+ mockNamespaceSummaryManager,
+ mockReconOMMetadataManager,
ozoneConfiguration) {
-
+
@Override
public TaskResult buildTaskResult(boolean success) {
return super.buildTaskResult(success);
}
-
- @Override
+
+ @Override
+ public NSSummaryTask getStagedTask(ReconOMMetadataManager
stagedOmMetadataManager,
+ DBStore stagedReconDbStore) throws
IOException {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]