Copilot commented on code in PR #9393:
URL: https://github.com/apache/ozone/pull/9393#discussion_r2590776754


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import jakarta.annotation.Nonnull;
+import java.nio.ByteBuffer;
+
+/**
+ * A concrete implementation of the Codec interface for the CodecBuffer type.
+ * This class provides methods to serialize and deserialize CodecBuffer
+ * objects to and from byte arrays and to interact with CodecBuffer instances
+ * using different allocation strategies (direct or heap).
+ *
+ * The CodecBufferCodec distinguishes between direct and non-direct
+ * (heap-based) buffers, ensuring compatibility between the provided allocator
+ * and buffer type during serialization and deserialization.
+ *
+ * This codec supports CodecBuffer-based methods.
+ * NOTE: This codec does not create copies of CodecBuffer objects and it 
returns the CodecBuffer object itself
+ * consumer of this codec and thus the caller should not close the CodecBuffer 
object in case of
+ * {@link #copyObject(CodecBuffer)} and {@link #toCodecBuffer(CodecBuffer, 
CodecBuffer.Allocator)} methods. This has
+ * been done to avoid unnecessary memory allocations.
+ * However, it still has to handle lifecyle of CodecBuffer returned by {@link 
#fromPersistedFormat(byte[])} method.

Review Comment:
   Spelling error: "lifecyle" should be "lifecycle"
   ```suggestion
    * However, it still has to handle lifecycle of CodecBuffer returned by 
{@link #fromPersistedFormat(byte[])} method.
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java:
##########
@@ -287,7 +289,8 @@ void performFullDefragmentation(DBStore checkpointDBStore, 
TablePrefixInfo prefi
       // Compact the table completely with kForce to get rid of tombstones.
       try (ManagedCompactRangeOptions compactRangeOptions = new 
ManagedCompactRangeOptions()) {
         
compactRangeOptions.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
-        compactRangeOptions.setExclusiveManualCompaction(true);
+        // Need to allow the range tombstones to change levels and get 
propogated to bottommost level.

Review Comment:
   Spelling error: "propogated" should be "propagated"
   ```suggestion
           // Need to allow the range tombstones to change levels and get 
propagated to bottommost level.
   ```



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java:
##########
@@ -195,11 +287,646 @@ public void testNeedsDefragmentationRequiresDefrag() 
throws IOException {
     verify(provider).close();
   }
 
+  private Map<String, InMemoryTestTable<String, CodecBuffer>> 
createTables(String... tableNames) {
+    return createTables(dummyTableValues, tableNames);
+  }
+
+  private Map<String, InMemoryTestTable<String, CodecBuffer>> createTables(
+      Map<String, CodecBuffer> tableValues, String... tableNames) {
+    Map<String, InMemoryTestTable<String, CodecBuffer>> tables = new 
HashMap<>();
+    for (String tableName : tableNames) {
+      tables.put(tableName, new StringInMemoryTestTable<>(tableValues, 
tableName));
+    }
+    return tables;
+  }
+
+  @Test
+  public void testPerformFullDefragmentation() throws Exception {
+    DBStore checkpointDBStore = mock(DBStore.class);
+    Map<String, InMemoryTestTable<String, CodecBuffer>> tableMap = 
createTables("cf1", "cf2", "cf3");
+    TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1", 
"ab", "cf2", "cd",
+        "cf3", "ef"));
+    Map<String, Map<String, CodecBuffer>> tablesCompacted = new HashMap<>();
+    Set<String> incrementalTables = Stream.of("cf1", 
"cf2").collect(Collectors.toSet());
+    when(checkpointDBStore.getTable(anyString(), eq(StringCodec.get()), 
eq(CodecBufferCodec.get(true))))
+        .thenAnswer(i -> {
+          String tableName = i.getArgument(0, String.class);
+          return tableMap.getOrDefault(tableName, null);
+        });
+    doAnswer(i -> {
+      String tableName = i.getArgument(0, String.class);
+      Map<String, CodecBuffer> table = new 
HashMap<>(tableMap.get(tableName).getMap());
+      tablesCompacted.putIfAbsent(tableName, table);
+      return null;
+
+    }).when(checkpointDBStore).compactTable(anyString(), any());
+
+    defragService.performFullDefragmentation(checkpointDBStore, prefixInfo, 
incrementalTables);
+    assertEquals(2, tablesCompacted.size());
+    for (Map.Entry<String, Map<String, CodecBuffer>> compactedTable : 
tablesCompacted.entrySet()) {
+      String prefix = prefixInfo.getTablePrefix(compactedTable.getKey());
+      Map<String, String> compactedStringTable = 
compactedTable.getValue().entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getFromCodecBuffer(e.getValue())));
+      Map<String, String> expectedValue = dummyTableValues.entrySet().stream()
+          .filter(e -> e.getKey().startsWith(prefix))
+          .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getFromCodecBuffer(e.getValue())));
+      assertEquals(expectedValue, compactedStringTable);
+      assertEquals(expectedValue, 
tableMap.get(compactedTable.getKey()).getMap().entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey, e -> 
getFromCodecBuffer(e.getValue()))));
+    }
+    for (Map.Entry<String, InMemoryTestTable<String, CodecBuffer>> 
nonCompactedTable : tableMap.entrySet()) {
+      if (!tablesCompacted.containsKey(nonCompactedTable.getKey())) {
+        assertEquals(dummyTableValues, nonCompactedTable.getValue().getMap());
+      }
+    }
+  }
+
+  @Test
+  public void testIngestNonIncrementalTables() throws Exception {
+    DBStore checkpointDBStore = mock(DBStore.class);
+    DBStore snapshotDBStore = mock(DBStore.class);
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), 
"vol1", "bucket1", "snap1");
+    TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1", 
"ab", "cf2", "cd",
+        "cf3", "ef", "cf4", ""));
+    Set<String> incrementalTables = Stream.of("cf1", 
"cf2").collect(Collectors.toSet());
+    Map<String, String> dumpedFileName = new HashMap<>();
+    Map<String, Table<CodecBuffer, CodecBuffer>> snapshotTables = 
Stream.of("cf1", "cf2", "cf3", "cf4", "cf5")
+        .map(name -> {
+          Table<CodecBuffer, CodecBuffer> table = mock(Table.class);
+          when(table.getName()).thenReturn(name);
+          try {
+            doAnswer(i -> {
+              CodecBuffer prefixBytes = i.getArgument(1) == null ? null : 
i.getArgument(1, CodecBuffer.class);
+              String prefix = prefixBytes == null ? "" : 
StringCodec.get().fromCodecBuffer(prefixBytes);
+              assertEquals(prefixInfo.getTablePrefix(name), prefix);
+              dumpedFileName.put(name, i.getArgument(0, 
File.class).toPath().toAbsolutePath().toString());
+              return null;
+            }).when(table).dumpToFileWithPrefix(any(File.class), any());
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return table;
+        }).collect(Collectors.toMap(Table::getName, Function.identity()));
+    Map<String, String> ingestedFiles = new HashMap<>();
+    Map<String, Table> checkpointTables = Stream.of("cf3", "cf4", "cf5")
+        .map(name -> {
+          Table<CodecBuffer, CodecBuffer> table = mock(Table.class);
+          when(table.getName()).thenReturn(name);
+          try {
+            doAnswer(i -> {
+              File file = i.getArgument(0, File.class);
+              ingestedFiles.put(name, 
file.toPath().toAbsolutePath().toString());
+              return null;
+            }).when(table).loadFromFile(any(File.class));
+          } catch (RocksDatabaseException e) {
+            throw new RuntimeException(e);
+          }
+          return table;
+        }).collect(Collectors.toMap(Table::getName, Function.identity()));
+
+    OmSnapshot snapshot = mock(OmSnapshot.class);
+    OmMetadataManagerImpl snapshotMetadataManager = 
mock(OmMetadataManagerImpl.class);
+    UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new 
UncheckedAutoCloseableSupplier<OmSnapshot>() {
+      @Override
+      public OmSnapshot get() {
+        return snapshot;
+      }
+
+      @Override
+      public void close() {
+
+      }
+    };
+
+    when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), 
anyString())).thenReturn(snapshotSupplier);
+    when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager);
+    when(snapshotMetadataManager.getStore()).thenReturn(snapshotDBStore);
+    List<Table<?, ?>> snapshotTableList = new 
ArrayList<>(snapshotTables.values());
+    when(snapshotDBStore.listTables()).thenReturn(snapshotTableList);
+
+    doAnswer(i -> {
+      String tableName = i.getArgument(0, String.class);
+      return snapshotTables.get(tableName);
+    }).when(snapshotDBStore).getTable(anyString(), 
eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true)));
+    doAnswer(i -> {
+      String tableName = i.getArgument(0, String.class);
+      return checkpointTables.get(tableName);
+    }).when(checkpointDBStore).getTable(anyString(), 
eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true)));
+
+    defragService.ingestNonIncrementalTables(checkpointDBStore, snapshotInfo, 
prefixInfo, incrementalTables);
+    assertEquals(checkpointTables.keySet(), dumpedFileName.keySet());
+    assertEquals(dumpedFileName, ingestedFiles);
+  }
+
+  private void assertContents(Map<String, Map<String, String>> tableContents, 
DBStore dbStore)
+      throws RocksDatabaseException, CodecException {
+    for (String tableName : dbStore.getTableNames().values()) {
+      Table<String, String> table = dbStore.getTable(tableName, 
StringCodec.get(), StringCodec.get());
+      try (Table.KeyValueIterator<String, String> iterator = table.iterator()) 
{
+        Map<String, String> expectedContents = tableContents.get(tableName);
+        Map<String, String> actualContents = new HashMap<>();
+        while (iterator.hasNext()) {
+          Table.KeyValue<String, String> kv = iterator.next();
+          actualContents.put(kv.getKey(), kv.getValue());
+        }
+        assertNotNull(expectedContents, "Expected contents for table " + 
tableName + " is null");
+        assertEquals(expectedContents, actualContents, "Table contents 
mismatch for table " + tableName);
+      }
+    }
+  }
+
+  private static Stream<Arguments> testCreateCheckpointCases() {
+    // Have random tables to be incremental to ensure content gets preserved.
+    return Stream.of(
+        Arguments.of(ImmutableSet.of(KEY_TABLE, BUCKET_TABLE, 
DIRECTORY_TABLE)),
+        Arguments.of(ImmutableSet.of(FILE_TABLE, DIRECTORY_TABLE, KEY_TABLE)),
+        Arguments.of(ImmutableSet.of(VOLUME_TABLE, BUCKET_TABLE, 
DELEGATION_TOKEN_TABLE))
+    );
+  }
+
+  private Map<String, Map<String, String>> createTableContents(Path path, 
String keyPrefix) throws IOException {
+    DBCheckpoint snapshotCheckpointLocation = new RocksDBCheckpoint(path);
+    Map<String, Map<String, String>> tableContents = new HashMap<>();
+    try (OmMetadataManagerImpl metadataManager = 
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
+        snapshotCheckpointLocation, false)) {
+      Set<String> metadataManagerTables = new 
HashSet<>(metadataManager.listTableNames());
+      for (String tableName : 
metadataManager.getStore().getTableNames().values()) {
+        if (metadataManagerTables.contains(tableName)) {
+          Table<String, String> table = 
metadataManager.getStore().getTable(tableName,
+              StringCodec.get(), StringCodec.get());
+          for (int i = 0; i < 10; i++) {
+            String key = tableName + keyPrefix + i;
+            String value = "value_" + i;
+            table.put(key, value);
+            tableContents.computeIfAbsent(tableName, k -> new 
HashMap<>()).put(key, value);
+          }
+        } else {
+          tableContents.put(tableName, Collections.emptyMap());
+        }
+      }
+    }
+    return tableContents;
+  }
+
+  @ParameterizedTest
+  @MethodSource("testCreateCheckpointCases")
+  public void testCreateCheckpoint(Set<String> incrementalTables) throws 
Exception {
+    SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), 
"vol1", "bucket1", "snap1");
+    DBCheckpoint snapshotCheckpointLocation =
+        new 
RocksDBCheckpoint(tempDir.resolve(snapshotInfo.getSnapshotId().toString()));
+    Map<String, Map<String, String>> tableContents =
+        
createTableContents(snapshotCheckpointLocation.getCheckpointLocation(), 
"_key_");
+    UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new 
UncheckedAutoCloseableSupplier<OmSnapshot>() {
+      private final OmMetadataManagerImpl snapshotMetadataManager =
+          OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, 
snapshotCheckpointLocation, false);
+
+      @Override
+      public OmSnapshot get() {
+        OmSnapshot snapshot = mock(OmSnapshot.class);
+        
when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager);
+        return snapshot;
+      }
+
+      @Override
+      public void close() {
+        try {
+          snapshotMetadataManager.close();
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+    };
+
+    when(omSnapshotManager.getActiveSnapshot(eq(snapshotInfo.getVolumeName()), 
eq(snapshotInfo.getBucketName()),
+        eq(snapshotInfo.getName()))).thenReturn(snapshotSupplier);
+    try (OmMetadataManagerImpl result = 
defragService.createCheckpoint(snapshotInfo, incrementalTables)) {
+      try (OmMetadataManagerImpl originalSnapshotStore =
+               
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, 
snapshotCheckpointLocation)) {
+        assertContents(tableContents, originalSnapshotStore.getStore());
+      } catch (Throwable e) {
+        throw new RuntimeException("Failed to load original snapshot store", 
e);
+      }
+      // Ensure non-incremental tables are cleared.
+      tableContents.entrySet().stream()
+          .filter(e -> !incrementalTables.contains(e.getKey()))
+          .forEach(e -> e.getValue().clear());
+      assertContents(tableContents, result.getStore());
+    }
+  }
+
+  private void assertContents(Map<String, Map<String, String>> contents, Path 
path) throws IOException {
+    DBCheckpoint dbCheckpoint = new RocksDBCheckpoint(path);
+    try (OmMetadataManagerImpl metadataManager = 
OmMetadataManagerImpl.createCheckpointMetadataManager(configuration,
+        dbCheckpoint, true)) {
+      assertContents(contents, metadataManager.getStore());
+    }
+  }
+
+  @Test
+  public void testAtomicSwitchSnapshotDB() throws Exception {
+    UUID snapshotId = UUID.randomUUID();
+    Path checkpointPath = tempDir.resolve("checkpoint");
+    Map<String, Map<String, String>> checkpointContent = 
createTableContents(checkpointPath, "_cp1_");
+    WritableOmSnapshotLocalDataProvider provider = 
mock(WritableOmSnapshotLocalDataProvider.class);
+    OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+
+    
when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotId)).thenReturn(provider);
+    when(provider.getSnapshotLocalData()).thenReturn(localData);
+    AtomicInteger version = new AtomicInteger(1);
+    when(localData.getVersion()).thenAnswer(i -> version.get());
+    doAnswer(i -> {
+      version.incrementAndGet();
+      return null;
+    }).when(provider).addSnapshotVersion(any());
+
+    Path nextVersionPath = tempDir.resolve(snapshotId + "v2").toAbsolutePath();
+    try (MockedStatic<OmSnapshotManager> mockedStatic = 
Mockito.mockStatic(OmSnapshotManager.class,
+        CALLS_REAL_METHODS)) {
+      mockedStatic.when(() -> 
OmSnapshotManager.getSnapshotPath(eq(metadataManager), eq(snapshotId), eq(2)))
+          .thenReturn(nextVersionPath);
+      Map<String, Map<String, String>> existingVersionContents = 
createTableContents(nextVersionPath, "_cp2_");
+      assertNotEquals(existingVersionContents, checkpointContent);
+      assertContents(existingVersionContents, nextVersionPath);
+      int result = defragService.atomicSwitchSnapshotDB(snapshotId, 
checkpointPath);
+      assertContents(checkpointContent, nextVersionPath);
+      assertEquals(1, result);
+      assertEquals(2, version.get());
+    }
+    assertNull(verify(provider).getSnapshotLocalData());

Review Comment:
   Incorrect assertion usage. 
`assertNull(verify(provider).getSnapshotLocalData())` is checking if the return 
value from `verify()` is null, but `verify()` returns the mocked object for 
chaining, not the method's return value. This should likely be 
`verify(provider).getSnapshotLocalData()` without `assertNull`, or if you want 
to assert the mock was called and returned null, use 
`when(...).thenReturn(null)` setup and verify separately. The current assertion 
will always fail since `verify()` never returns null.
   ```suggestion
       verify(provider).getSnapshotLocalData();
   ```



##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.utils.db;
+
+import jakarta.annotation.Nonnull;
+import java.nio.ByteBuffer;
+
+/**
+ * A concrete implementation of the Codec interface for the CodecBuffer type.
+ * This class provides methods to serialize and deserialize CodecBuffer
+ * objects to and from byte arrays and to interact with CodecBuffer instances
+ * using different allocation strategies (direct or heap).
+ *
+ * The CodecBufferCodec distinguishes between direct and non-direct
+ * (heap-based) buffers, ensuring compatibility between the provided allocator
+ * and buffer type during serialization and deserialization.
+ *
+ * This codec supports CodecBuffer-based methods.
+ * NOTE: This codec does not create copies of CodecBuffer objects and it 
returns the CodecBuffer object itself
+ * consumer of this codec and thus the caller should not close the CodecBuffer 
object in case of

Review Comment:
   Incomplete sentence in javadoc. The sentence "consumer of this codec and 
thus the caller should not close..." appears to be missing the beginning of the 
thought. It should likely read: "This codec does not create copies and returns 
the CodecBuffer object itself **to the** consumer of this codec..."
   ```suggestion
    * NOTE: This codec does not create copies of CodecBuffer objects and it 
returns the CodecBuffer object itself to the
    * consumer of this codec, and thus the caller should not close the 
CodecBuffer object in case of
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to