swamirishi commented on code in PR #9393:
URL: https://github.com/apache/ozone/pull/9393#discussion_r2593157179
##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java:
##########
@@ -195,11 +287,646 @@ public void testNeedsDefragmentationRequiresDefrag()
throws IOException {
verify(provider).close();
}
+ private Map<String, InMemoryTestTable<String, CodecBuffer>>
createTables(String... tableNames) {
+ return createTables(dummyTableValues, tableNames);
+ }
+
+ private Map<String, InMemoryTestTable<String, CodecBuffer>> createTables(
+ Map<String, CodecBuffer> tableValues, String... tableNames) {
+ Map<String, InMemoryTestTable<String, CodecBuffer>> tables = new
HashMap<>();
+ for (String tableName : tableNames) {
+ tables.put(tableName, new StringInMemoryTestTable<>(tableValues,
tableName));
+ }
+ return tables;
+ }
+
+ @Test
+ public void testPerformFullDefragmentation() throws Exception {
+ DBStore checkpointDBStore = mock(DBStore.class);
+ Map<String, InMemoryTestTable<String, CodecBuffer>> tableMap =
createTables("cf1", "cf2", "cf3");
+ TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1",
"ab", "cf2", "cd",
+ "cf3", "ef"));
+ Map<String, Map<String, CodecBuffer>> tablesCompacted = new HashMap<>();
+ Set<String> incrementalTables = Stream.of("cf1",
"cf2").collect(Collectors.toSet());
+ when(checkpointDBStore.getTable(anyString(), eq(StringCodec.get()),
eq(CodecBufferCodec.get(true))))
+ .thenAnswer(i -> {
+ String tableName = i.getArgument(0, String.class);
+ return tableMap.getOrDefault(tableName, null);
+ });
+ doAnswer(i -> {
+ String tableName = i.getArgument(0, String.class);
+ Map<String, CodecBuffer> table = new
HashMap<>(tableMap.get(tableName).getMap());
+ tablesCompacted.putIfAbsent(tableName, table);
+ return null;
+
+ }).when(checkpointDBStore).compactTable(anyString(), any());
+
+ defragService.performFullDefragmentation(checkpointDBStore, prefixInfo,
incrementalTables);
+ assertEquals(2, tablesCompacted.size());
+ for (Map.Entry<String, Map<String, CodecBuffer>> compactedTable :
tablesCompacted.entrySet()) {
+ String prefix = prefixInfo.getTablePrefix(compactedTable.getKey());
+ Map<String, String> compactedStringTable =
compactedTable.getValue().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
getFromCodecBuffer(e.getValue())));
+ Map<String, String> expectedValue = dummyTableValues.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(prefix))
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
getFromCodecBuffer(e.getValue())));
+ assertEquals(expectedValue, compactedStringTable);
+ assertEquals(expectedValue,
tableMap.get(compactedTable.getKey()).getMap().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
getFromCodecBuffer(e.getValue()))));
+ }
+ for (Map.Entry<String, InMemoryTestTable<String, CodecBuffer>>
nonCompactedTable : tableMap.entrySet()) {
+ if (!tablesCompacted.containsKey(nonCompactedTable.getKey())) {
+ assertEquals(dummyTableValues, nonCompactedTable.getValue().getMap());
+ }
+ }
+ }
+
+ @Test
+ public void testIngestNonIncrementalTables() throws Exception {
+ DBStore checkpointDBStore = mock(DBStore.class);
+ DBStore snapshotDBStore = mock(DBStore.class);
+ SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(),
"vol1", "bucket1", "snap1");
+ TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1",
"ab", "cf2", "cd",
+ "cf3", "ef", "cf4", ""));
+ Set<String> incrementalTables = Stream.of("cf1",
"cf2").collect(Collectors.toSet());
+ Map<String, String> dumpedFileName = new HashMap<>();
+ Map<String, Table<CodecBuffer, CodecBuffer>> snapshotTables =
Stream.of("cf1", "cf2", "cf3", "cf4", "cf5")
+ .map(name -> {
+ Table<CodecBuffer, CodecBuffer> table = mock(Table.class);
+ when(table.getName()).thenReturn(name);
+ try {
+ doAnswer(i -> {
+ CodecBuffer prefixBytes = i.getArgument(1) == null ? null :
i.getArgument(1, CodecBuffer.class);
+ String prefix = prefixBytes == null ? "" :
StringCodec.get().fromCodecBuffer(prefixBytes);
+ assertEquals(prefixInfo.getTablePrefix(name), prefix);
+ dumpedFileName.put(name, i.getArgument(0,
File.class).toPath().toAbsolutePath().toString());
+ return null;
+ }).when(table).dumpToFileWithPrefix(any(File.class), any());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return table;
+ }).collect(Collectors.toMap(Table::getName, Function.identity()));
+ Map<String, String> ingestedFiles = new HashMap<>();
+ Map<String, Table> checkpointTables = Stream.of("cf3", "cf4", "cf5")
+ .map(name -> {
+ Table<CodecBuffer, CodecBuffer> table = mock(Table.class);
+ when(table.getName()).thenReturn(name);
+ try {
+ doAnswer(i -> {
+ File file = i.getArgument(0, File.class);
+ ingestedFiles.put(name,
file.toPath().toAbsolutePath().toString());
+ return null;
+ }).when(table).loadFromFile(any(File.class));
+ } catch (RocksDatabaseException e) {
+ throw new RuntimeException(e);
+ }
+ return table;
+ }).collect(Collectors.toMap(Table::getName, Function.identity()));
+
+ OmSnapshot snapshot = mock(OmSnapshot.class);
+ OmMetadataManagerImpl snapshotMetadataManager =
mock(OmMetadataManagerImpl.class);
+ UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new
UncheckedAutoCloseableSupplier<OmSnapshot>() {
+ @Override
+ public OmSnapshot get() {
+ return snapshot;
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+
+ when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(),
anyString())).thenReturn(snapshotSupplier);
+ when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager);
+ when(snapshotMetadataManager.getStore()).thenReturn(snapshotDBStore);
+ List<Table<?, ?>> snapshotTableList = new
ArrayList<>(snapshotTables.values());
+ when(snapshotDBStore.listTables()).thenReturn(snapshotTableList);
+
+ doAnswer(i -> {
+ String tableName = i.getArgument(0, String.class);
+ return snapshotTables.get(tableName);
+ }).when(snapshotDBStore).getTable(anyString(),
eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true)));
+ doAnswer(i -> {
+ String tableName = i.getArgument(0, String.class);
+ return checkpointTables.get(tableName);
+ }).when(checkpointDBStore).getTable(anyString(),
eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true)));
+
+ defragService.ingestNonIncrementalTables(checkpointDBStore, snapshotInfo,
prefixInfo, incrementalTables);
+ assertEquals(checkpointTables.keySet(), dumpedFileName.keySet());
+ assertEquals(dumpedFileName, ingestedFiles);
+ }
+
+ private void assertContents(Map<String, Map<String, String>> tableContents,
DBStore dbStore)
+ throws RocksDatabaseException, CodecException {
+ for (String tableName : dbStore.getTableNames().values()) {
+ Table<String, String> table = dbStore.getTable(tableName,
StringCodec.get(), StringCodec.get());
+ try (Table.KeyValueIterator<String, String> iterator = table.iterator())
{
+ Map<String, String> expectedContents = tableContents.get(tableName);
+ Map<String, String> actualContents = new HashMap<>();
+ while (iterator.hasNext()) {
+ Table.KeyValue<String, String> kv = iterator.next();
+ actualContents.put(kv.getKey(), kv.getValue());
+ }
+ assertNotNull(expectedContents, "Expected contents for table " +
tableName + " is null");
+ assertEquals(expectedContents, actualContents, "Table contents
mismatch for table " + tableName);
+ }
+ }
+ }
+
+ private static Stream<Arguments> testCreateCheckpointCases() {
+ // Have random tables to be incremental to ensure content gets preserved.
+ return Stream.of(
+ Arguments.of(ImmutableSet.of(KEY_TABLE, BUCKET_TABLE,
DIRECTORY_TABLE)),
+ Arguments.of(ImmutableSet.of(FILE_TABLE, DIRECTORY_TABLE, KEY_TABLE)),
+ Arguments.of(ImmutableSet.of(VOLUME_TABLE, BUCKET_TABLE,
DELEGATION_TOKEN_TABLE))
+ );
+ }
+
+ private Map<String, Map<String, String>> createTableContents(Path path,
String keyPrefix) throws IOException {
+ DBCheckpoint snapshotCheckpointLocation = new RocksDBCheckpoint(path);
+ Map<String, Map<String, String>> tableContents = new HashMap<>();
+ try (OmMetadataManagerImpl metadataManager =
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
+ snapshotCheckpointLocation, false)) {
+ Set<String> metadataManagerTables = new
HashSet<>(metadataManager.listTableNames());
+ for (String tableName :
metadataManager.getStore().getTableNames().values()) {
+ if (metadataManagerTables.contains(tableName)) {
+ Table<String, String> table =
metadataManager.getStore().getTable(tableName,
+ StringCodec.get(), StringCodec.get());
+ for (int i = 0; i < 10; i++) {
+ String key = tableName + keyPrefix + i;
+ String value = "value_" + i;
+ table.put(key, value);
+ tableContents.computeIfAbsent(tableName, k -> new
HashMap<>()).put(key, value);
+ }
+ } else {
+ tableContents.put(tableName, Collections.emptyMap());
+ }
+ }
+ }
+ return tableContents;
+ }
+
+ @ParameterizedTest
+ @MethodSource("testCreateCheckpointCases")
+ public void testCreateCheckpoint(Set<String> incrementalTables) throws
Exception {
+ SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(),
"vol1", "bucket1", "snap1");
+ DBCheckpoint snapshotCheckpointLocation =
+ new
RocksDBCheckpoint(tempDir.resolve(snapshotInfo.getSnapshotId().toString()));
+ Map<String, Map<String, String>> tableContents =
+
createTableContents(snapshotCheckpointLocation.getCheckpointLocation(),
"_key_");
+ UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new
UncheckedAutoCloseableSupplier<OmSnapshot>() {
+ private final OmMetadataManagerImpl snapshotMetadataManager =
+ OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
snapshotCheckpointLocation, false);
+
+ @Override
+ public OmSnapshot get() {
+ OmSnapshot snapshot = mock(OmSnapshot.class);
+
when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager);
+ return snapshot;
+ }
+
+ @Override
+ public void close() {
+ try {
+ snapshotMetadataManager.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ };
+
+ when(omSnapshotManager.getActiveSnapshot(eq(snapshotInfo.getVolumeName()),
eq(snapshotInfo.getBucketName()),
+ eq(snapshotInfo.getName()))).thenReturn(snapshotSupplier);
+ try (OmMetadataManagerImpl result =
defragService.createCheckpoint(snapshotInfo, incrementalTables)) {
+ try (OmMetadataManagerImpl originalSnapshotStore =
+
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
snapshotCheckpointLocation)) {
+ assertContents(tableContents, originalSnapshotStore.getStore());
+ } catch (Throwable e) {
+ throw new RuntimeException("Failed to load original snapshot store",
e);
+ }
+ // Ensure non-incremental tables are cleared.
+ tableContents.entrySet().stream()
+ .filter(e -> !incrementalTables.contains(e.getKey()))
+ .forEach(e -> e.getValue().clear());
+ assertContents(tableContents, result.getStore());
+ }
+ }
+
+ private void assertContents(Map<String, Map<String, String>> contents, Path
path) throws IOException {
+ DBCheckpoint dbCheckpoint = new RocksDBCheckpoint(path);
+ try (OmMetadataManagerImpl metadataManager =
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
+ dbCheckpoint, true)) {
+ assertContents(contents, metadataManager.getStore());
+ }
+ }
+
+ @Test
+ public void testAtomicSwitchSnapshotDB() throws Exception {
+ UUID snapshotId = UUID.randomUUID();
+ Path checkpointPath = tempDir.resolve("checkpoint");
+ Map<String, Map<String, String>> checkpointContent =
createTableContents(checkpointPath, "_cp1_");
+ WritableOmSnapshotLocalDataProvider provider =
mock(WritableOmSnapshotLocalDataProvider.class);
+ OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+
+
when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotId)).thenReturn(provider);
+ when(provider.getSnapshotLocalData()).thenReturn(localData);
+ AtomicInteger version = new AtomicInteger(1);
+ when(localData.getVersion()).thenAnswer(i -> version.get());
+ doAnswer(i -> {
+ version.incrementAndGet();
+ return null;
+ }).when(provider).addSnapshotVersion(any());
+
+ Path nextVersionPath = tempDir.resolve(snapshotId + "v2").toAbsolutePath();
+ try (MockedStatic<OmSnapshotManager> mockedStatic =
Mockito.mockStatic(OmSnapshotManager.class,
+ CALLS_REAL_METHODS)) {
+ mockedStatic.when(() ->
OmSnapshotManager.getSnapshotPath(eq(metadataManager), eq(snapshotId), eq(2)))
+ .thenReturn(nextVersionPath);
+ Map<String, Map<String, String>> existingVersionContents =
createTableContents(nextVersionPath, "_cp2_");
+ assertNotEquals(existingVersionContents, checkpointContent);
+ assertContents(existingVersionContents, nextVersionPath);
+ int result = defragService.atomicSwitchSnapshotDB(snapshotId,
checkpointPath);
+ assertContents(checkpointContent, nextVersionPath);
+ assertEquals(1, result);
+ assertEquals(2, version.get());
+ }
+ assertNull(verify(provider).getSnapshotLocalData());
Review Comment:
This is to fix findbugs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]