swamirishi commented on code in PR #9393:
URL: https://github.com/apache/ozone/pull/9393#discussion_r2593157179


##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java:
##########
@@ -195,11 +287,646 @@ public void testNeedsDefragmentationRequiresDefrag() 
throws IOException {
     verify(provider).close();
   }
 
+  private Map<String, InMemoryTestTable<String, CodecBuffer>> 
createTables(String... tableNames) {
+    return createTables(dummyTableValues, tableNames);
+  }
+
+  private Map<String, InMemoryTestTable<String, CodecBuffer>> createTables(
+      Map<String, CodecBuffer> tableValues, String... tableNames) {
+    Map<String, InMemoryTestTable<String, CodecBuffer>> tables = new 
HashMap<>();
+    for (String tableName : tableNames) {
+      tables.put(tableName, new StringInMemoryTestTable<>(tableValues, 
tableName));
+    }
+    return tables;
+  }
+
+  @Test
+  public void testPerformFullDefragmentation() throws Exception {
+    DBStore checkpointDBStore = mock(DBStore.class);
+    Map<String, InMemoryTestTable<String, CodecBuffer>> tableMap = 
createTables("cf1", "cf2", "cf3");
+    TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1", 
"ab", "cf2", "cd",
+        "cf3", "ef"));
+    Map<String, Map<String, CodecBuffer>> tablesCompacted = new HashMap<>();
+    Set<String> incrementalTables = Stream.of("cf1", 
"cf2").collect(Collectors.toSet());
+    when(checkpointDBStore.getTable(anyString(), eq(StringCodec.get()), 
eq(CodecBufferCodec.get(true))))
+        .thenAnswer(i -> {
+          String tableName = i.getArgument(0, String.class);
+          return tableMap.getOrDefault(tableName, null);
+        });
+    doAnswer(i -> {
+      String tableName = i.getArgument(0, String.class);
+      Map<String, CodecBuffer> table = new 
HashMap<>(tableMap.get(tableName).getMap());
+      tablesCompacted.putIfAbsent(tableName, table);
+      return null;
+
+    }).when(checkpointDBStore).compactTable(anyString(), any());
+
+    defragService.performFullDefragmentation(checkpointDBStore, prefixInfo, 
incrementalTables);
+    assertEquals(2, tablesCompacted.size());
+    for (Map.Entry<String, Map<String, CodecBuffer>> compactedTable : 
tablesCompacted.entrySet()) {
+      String prefix = prefixInfo.getTablePrefix(compactedTable.getKey());
+      Map<String, String> compactedStringTable = 
compactedTable.getValue().entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getFromCodecBuffer(e.getValue())));
+      Map<String, String> expectedValue = dummyTableValues.entrySet().stream()
+          .filter(e -> e.getKey().startsWith(prefix))
+          .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getFromCodecBuffer(e.getValue())));
+      assertEquals(expectedValue, compactedStringTable);
+      assertEquals(expectedValue, 
tableMap.get(compactedTable.getKey()).getMap().entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getFromCodecBuffer(e.getValue()))));
+    }
+    for (Map.Entry<String, InMemoryTestTable<String, CodecBuffer>> 
nonCompactedTable : tableMap.entrySet()) {
+      if (!tablesCompacted.containsKey(nonCompactedTable.getKey())) {
+        assertEquals(dummyTableValues, nonCompactedTable.getValue().getMap());
+      }
+    }
+  }
+
+  @Test
+  public void testIngestNonIncrementalTables() throws Exception {
+    DBStore checkpointDBStore = mock(DBStore.class);
+    DBStore snapshotDBStore = mock(DBStore.class);
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), 
"vol1", "bucket1", "snap1");
+    TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1", 
"ab", "cf2", "cd",
+        "cf3", "ef", "cf4", ""));
+    Set<String> incrementalTables = Stream.of("cf1", 
"cf2").collect(Collectors.toSet());
+    Map<String, String> dumpedFileName = new HashMap<>();
+    Map<String, Table<CodecBuffer, CodecBuffer>> snapshotTables = 
Stream.of("cf1", "cf2", "cf3", "cf4", "cf5")
+        .map(name -> {
+          Table<CodecBuffer, CodecBuffer> table = mock(Table.class);
+          when(table.getName()).thenReturn(name);
+          try {
+            doAnswer(i -> {
+              CodecBuffer prefixBytes = i.getArgument(1) == null ? null : 
i.getArgument(1, CodecBuffer.class);
+              String prefix = prefixBytes == null ? "" : 
StringCodec.get().fromCodecBuffer(prefixBytes);
+              assertEquals(prefixInfo.getTablePrefix(name), prefix);
+              dumpedFileName.put(name, i.getArgument(0, 
File.class).toPath().toAbsolutePath().toString());
+              return null;
+            }).when(table).dumpToFileWithPrefix(any(File.class), any());
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return table;
+        }).collect(Collectors.toMap(Table::getName, Function.identity()));
+    Map<String, String> ingestedFiles = new HashMap<>();
+    Map<String, Table> checkpointTables = Stream.of("cf3", "cf4", "cf5")
+        .map(name -> {
+          Table<CodecBuffer, CodecBuffer> table = mock(Table.class);
+          when(table.getName()).thenReturn(name);
+          try {
+            doAnswer(i -> {
+              File file = i.getArgument(0, File.class);
+              ingestedFiles.put(name, 
file.toPath().toAbsolutePath().toString());
+              return null;
+            }).when(table).loadFromFile(any(File.class));
+          } catch (RocksDatabaseException e) {
+            throw new RuntimeException(e);
+          }
+          return table;
+        }).collect(Collectors.toMap(Table::getName, Function.identity()));
+
+    OmSnapshot snapshot = mock(OmSnapshot.class);
+    OmMetadataManagerImpl snapshotMetadataManager = 
mock(OmMetadataManagerImpl.class);
+    UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new 
UncheckedAutoCloseableSupplier<OmSnapshot>() {
+      @Override
+      public OmSnapshot get() {
+        return snapshot;
+      }
+
+      @Override
+      public void close() {
+
+      }
+    };
+
+    when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), 
anyString())).thenReturn(snapshotSupplier);
+    when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager);
+    when(snapshotMetadataManager.getStore()).thenReturn(snapshotDBStore);
+    List<Table<?, ?>> snapshotTableList = new 
ArrayList<>(snapshotTables.values());
+    when(snapshotDBStore.listTables()).thenReturn(snapshotTableList);
+
+    doAnswer(i -> {
+      String tableName = i.getArgument(0, String.class);
+      return snapshotTables.get(tableName);
+    }).when(snapshotDBStore).getTable(anyString(), 
eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true)));
+    doAnswer(i -> {
+      String tableName = i.getArgument(0, String.class);
+      return checkpointTables.get(tableName);
+    }).when(checkpointDBStore).getTable(anyString(), 
eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true)));
+
+    defragService.ingestNonIncrementalTables(checkpointDBStore, snapshotInfo, 
prefixInfo, incrementalTables);
+    assertEquals(checkpointTables.keySet(), dumpedFileName.keySet());
+    assertEquals(dumpedFileName, ingestedFiles);
+  }
+
+  private void assertContents(Map<String, Map<String, String>> tableContents, 
DBStore dbStore)
+      throws RocksDatabaseException, CodecException {
+    for (String tableName : dbStore.getTableNames().values()) {
+      Table<String, String> table = dbStore.getTable(tableName, 
StringCodec.get(), StringCodec.get());
+      try (Table.KeyValueIterator<String, String> iterator = table.iterator()) 
{
+        Map<String, String> expectedContents = tableContents.get(tableName);
+        Map<String, String> actualContents = new HashMap<>();
+        while (iterator.hasNext()) {
+          Table.KeyValue<String, String> kv = iterator.next();
+          actualContents.put(kv.getKey(), kv.getValue());
+        }
+        assertNotNull(expectedContents, "Expected contents for table " + 
tableName + " is null");
+        assertEquals(expectedContents, actualContents, "Table contents 
mismatch for table " + tableName);
+      }
+    }
+  }
+
+  private static Stream<Arguments> testCreateCheckpointCases() {
+    // Have random tables to be incremental to ensure content gets preserved.
+    return Stream.of(
+        Arguments.of(ImmutableSet.of(KEY_TABLE, BUCKET_TABLE, 
DIRECTORY_TABLE)),
+        Arguments.of(ImmutableSet.of(FILE_TABLE, DIRECTORY_TABLE, KEY_TABLE)),
+        Arguments.of(ImmutableSet.of(VOLUME_TABLE, BUCKET_TABLE, 
DELEGATION_TOKEN_TABLE))
+    );
+  }
+
+  private Map<String, Map<String, String>> createTableContents(Path path, 
String keyPrefix) throws IOException {
+    DBCheckpoint snapshotCheckpointLocation = new RocksDBCheckpoint(path);
+    Map<String, Map<String, String>> tableContents = new HashMap<>();
+    try (OmMetadataManagerImpl metadataManager = 
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
+        snapshotCheckpointLocation, false)) {
+      Set<String> metadataManagerTables = new 
HashSet<>(metadataManager.listTableNames());
+      for (String tableName : 
metadataManager.getStore().getTableNames().values()) {
+        if (metadataManagerTables.contains(tableName)) {
+          Table<String, String> table = 
metadataManager.getStore().getTable(tableName,
+              StringCodec.get(), StringCodec.get());
+          for (int i = 0; i < 10; i++) {
+            String key = tableName + keyPrefix + i;
+            String value = "value_" + i;
+            table.put(key, value);
+            tableContents.computeIfAbsent(tableName, k -> new 
HashMap<>()).put(key, value);
+          }
+        } else {
+          tableContents.put(tableName, Collections.emptyMap());
+        }
+      }
+    }
+    return tableContents;
+  }
+
+  @ParameterizedTest
+  @MethodSource("testCreateCheckpointCases")
+  public void testCreateCheckpoint(Set<String> incrementalTables) throws 
Exception {
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), 
"vol1", "bucket1", "snap1");
+    DBCheckpoint snapshotCheckpointLocation =
+        new 
RocksDBCheckpoint(tempDir.resolve(snapshotInfo.getSnapshotId().toString()));
+    Map<String, Map<String, String>> tableContents =
+        
createTableContents(snapshotCheckpointLocation.getCheckpointLocation(), 
"_key_");
+    UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new 
UncheckedAutoCloseableSupplier<OmSnapshot>() {
+      private final OmMetadataManagerImpl snapshotMetadataManager =
+          OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, 
snapshotCheckpointLocation, false);
+
+      @Override
+      public OmSnapshot get() {
+        OmSnapshot snapshot = mock(OmSnapshot.class);
+        
when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager);
+        return snapshot;
+      }
+
+      @Override
+      public void close() {
+        try {
+          snapshotMetadataManager.close();
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+    };
+
+    when(omSnapshotManager.getActiveSnapshot(eq(snapshotInfo.getVolumeName()), 
eq(snapshotInfo.getBucketName()),
+        eq(snapshotInfo.getName()))).thenReturn(snapshotSupplier);
+    try (OmMetadataManagerImpl result = 
defragService.createCheckpoint(snapshotInfo, incrementalTables)) {
+      try (OmMetadataManagerImpl originalSnapshotStore =
+               
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, 
snapshotCheckpointLocation)) {
+        assertContents(tableContents, originalSnapshotStore.getStore());
+      } catch (Throwable e) {
+        throw new RuntimeException("Failed to load original snapshot store", 
e);
+      }
+      // Ensure non-incremental tables are cleared.
+      tableContents.entrySet().stream()
+          .filter(e -> !incrementalTables.contains(e.getKey()))
+          .forEach(e -> e.getValue().clear());
+      assertContents(tableContents, result.getStore());
+    }
+  }
+
+  private void assertContents(Map<String, Map<String, String>> contents, Path 
path) throws IOException {
+    DBCheckpoint dbCheckpoint = new RocksDBCheckpoint(path);
+    try (OmMetadataManagerImpl metadataManager = 
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
+        dbCheckpoint, true)) {
+      assertContents(contents, metadataManager.getStore());
+    }
+  }
+
+  @Test
+  public void testAtomicSwitchSnapshotDB() throws Exception {
+    UUID snapshotId = UUID.randomUUID();
+    Path checkpointPath = tempDir.resolve("checkpoint");
+    Map<String, Map<String, String>> checkpointContent = 
createTableContents(checkpointPath, "_cp1_");
+    WritableOmSnapshotLocalDataProvider provider = 
mock(WritableOmSnapshotLocalDataProvider.class);
+    OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+
+    
when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotId)).thenReturn(provider);
+    when(provider.getSnapshotLocalData()).thenReturn(localData);
+    AtomicInteger version = new AtomicInteger(1);
+    when(localData.getVersion()).thenAnswer(i -> version.get());
+    doAnswer(i -> {
+      version.incrementAndGet();
+      return null;
+    }).when(provider).addSnapshotVersion(any());
+
+    Path nextVersionPath = tempDir.resolve(snapshotId + "v2").toAbsolutePath();
+    try (MockedStatic<OmSnapshotManager> mockedStatic = 
Mockito.mockStatic(OmSnapshotManager.class,
+        CALLS_REAL_METHODS)) {
+      mockedStatic.when(() -> 
OmSnapshotManager.getSnapshotPath(eq(metadataManager), eq(snapshotId), eq(2)))
+          .thenReturn(nextVersionPath);
+      Map<String, Map<String, String>> existingVersionContents = 
createTableContents(nextVersionPath, "_cp2_");
+      assertNotEquals(existingVersionContents, checkpointContent);
+      assertContents(existingVersionContents, nextVersionPath);
+      int result = defragService.atomicSwitchSnapshotDB(snapshotId, 
checkpointPath);
+      assertContents(checkpointContent, nextVersionPath);
+      assertEquals(1, result);
+      assertEquals(2, version.get());
+    }
+    assertNull(verify(provider).getSnapshotLocalData());

Review Comment:
   This is to fix findbugs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to