swagle commented on a change in pull request #1259: HDDS-1105 : Add mechanism
in Recon to obtain DB snapshot 'delta' updates from Ozone Manager
URL: https://github.com/apache/hadoop/pull/1259#discussion_r313065850
##########
File path:
hadoop-ozone/ozone-recon/src/test/java/org/apache/hadoop/ozone/recon/spi/impl/TestOzoneManagerServiceProviderImpl.java
##########
@@ -158,4 +165,162 @@ public void testGetOzoneManagerDBSnapshot() throws
Exception {
assertTrue(checkpoint.getCheckpointLocation().toFile()
.listFiles().length == 2);
}
+
+ @Test
+ public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
+
+ // Writing 2 Keys into a source OM DB and collecting it in a
+ // DBUpdatesWrapper.
+ OMMetadataManager sourceOMMetadataMgr = initializeNewOmMetadataManager();
+ writeDataToOm(sourceOMMetadataMgr, "key_one");
+ writeDataToOm(sourceOMMetadataMgr, "key_two");
+
+ RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb();
+ TransactionLogIterator transactionLogIterator =
rocksDB.getUpdatesSince(0L);
+ DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
+ while(transactionLogIterator.isValid()) {
+ TransactionLogIterator.BatchResult result =
+ transactionLogIterator.getBatch();
+ result.writeBatch().markWalTerminationPoint();
+ WriteBatch writeBatch = result.writeBatch();
+ dbUpdatesWrapper.addWriteBatch(writeBatch.data(),
+ result.sequenceNumber());
+ transactionLogIterator.next();
+ }
+
+ // OM Service Provider's Metadata Manager.
+ OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
+
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+ new OzoneManagerServiceProviderImpl(configuration,
+ getTestMetadataManager(omMetadataManager),
+ getMockTaskController(), new ReconUtils());
+ OzoneManagerProtocol ozoneManagerProtocolMock =
+ mock(OzoneManagerProtocol.class);
+ when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
+ .DBUpdatesRequest.class))).thenReturn(dbUpdatesWrapper);
+ injectOzoneServiceProviderField(ozoneManagerServiceProvider,
+ ozoneManagerProtocolMock, "ozoneManagerClient");
+
+ OMDBUpdatesHandler updatesHandler =
+ new OMDBUpdatesHandler(omMetadataManager);
+ ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
+ 0L, updatesHandler);
+
+ // In this method, we have to assert the "GET" part and the "APPLY" path.
+
+ // Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
+ // events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
+ assertEquals(4, updatesHandler.getEvents().size());
+
+ // Assert APPLY path --> Verify if the OM service provider's RocksDB got
+ // the changes.
+ String fullKey = omMetadataManager.getOzoneKey("sampleVol",
+ "bucketOne", "key_one");
+ assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+ .getKeyTable().isExist(fullKey));
+ fullKey = omMetadataManager.getOzoneKey("sampleVol",
+ "bucketOne", "key_two");
+ assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
+ .getKeyTable().isExist(fullKey));
+ }
+
+ @Test
+ public void testSyncDataFromOMFullSnapshot() throws Exception {
+
+ // Empty OM DB to start with.
+ ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
+ initializeEmptyOmMetadataManager());
+ ReconTaskStatusDao reconTaskStatusDaoMock =
+ mock(ReconTaskStatusDao.class);
+ doNothing().when(reconTaskStatusDaoMock)
+ .update(any(ReconTaskStatus.class));
+
+ ReconTaskController reconTaskControllerMock = getMockTaskController();
+ when(reconTaskControllerMock.getReconTaskStatusDao())
+ .thenReturn(reconTaskStatusDaoMock);
+ doNothing().when(reconTaskControllerMock)
+ .reInitializeTasks(omMetadataManager);
+
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+ new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
+ reconTaskControllerMock, new ReconUtils());
+
+ //Should trigger full snapshot request.
+ ozoneManagerServiceProvider.syncDataFromOM();
+
+ ArgumentCaptor<ReconTaskStatus> captor =
+ ArgumentCaptor.forClass(ReconTaskStatus.class);
+ verify(reconTaskStatusDaoMock, times(1))
+ .update(captor.capture());
+ assertTrue(captor.getValue().getTaskName().equals("OM_DB_FULL_SNAPSHOT"));
+ verify(reconTaskControllerMock, times(1))
+ .reInitializeTasks(omMetadataManager);
+ }
+
+ @Test
+ public void testSyncDataFromOMDeltaUpdates() throws Exception {
+
+ // Non-Empty OM DB to start with.
+ ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
+ initializeNewOmMetadataManager());
+ ReconTaskStatusDao reconTaskStatusDaoMock =
+ mock(ReconTaskStatusDao.class);
+ doNothing().when(reconTaskStatusDaoMock)
+ .update(any(ReconTaskStatus.class));
+
+ ReconTaskController reconTaskControllerMock = getMockTaskController();
+ when(reconTaskControllerMock.getReconTaskStatusDao())
+ .thenReturn(reconTaskStatusDaoMock);
+ doNothing().when(reconTaskControllerMock)
+ .consumeOMEvents(any(OMUpdateEventBatch.class),
+ any(OMMetadataManager.class));
+
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
+ new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
+ reconTaskControllerMock, new ReconUtils());
+ OzoneManagerProtocol ozoneManagerProtocolMock =
+ mock(OzoneManagerProtocol.class);
+ when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
+ .DBUpdatesRequest.class))).thenReturn(new DBUpdatesWrapper());
+ injectOzoneServiceProviderField(ozoneManagerServiceProvider,
+ ozoneManagerProtocolMock, "ozoneManagerClient");
+
+ // Should trigger delta updates.
+ ozoneManagerServiceProvider.syncDataFromOM();
+
+ ArgumentCaptor<ReconTaskStatus> captor =
+ ArgumentCaptor.forClass(ReconTaskStatus.class);
+ verify(reconTaskStatusDaoMock, times(1))
+ .update(captor.capture());
+ assertTrue(captor.getValue().getTaskName().equals("OM_DB_DELTA_UPDATES"));
+
+ verify(reconTaskControllerMock, times(1))
+ .consumeOMEvents(any(OMUpdateEventBatch.class),
+ any(OMMetadataManager.class));
+ }
+
+ private ReconTaskController getMockTaskController() {
+ ReconTaskController reconTaskControllerMock =
+ mock(ReconTaskController.class);
+ return reconTaskControllerMock;
+ }
+
+ private ReconUtils getMockReconUtils() throws IOException {
+ ReconUtils reconUtilsMock = mock(ReconUtils.class);
+ when(reconUtilsMock.getReconDbDir(any(),
anyString())).thenCallRealMethod();
+ doCallRealMethod().when(reconUtilsMock).untarCheckpointFile(any(), any());
+ return reconUtilsMock;
+ }
+
+ private void injectOzoneServiceProviderField(
+ OzoneManagerServiceProviderImpl ozoneManagerServiceProvider,
+ Object fieldValue, String fieldName)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field f1 = ozoneManagerServiceProvider.getClass()
Review comment:
Let's use the @Provides method for some stateful instance creation instead
of ugly refection code :-)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]