Copilot commented on code in PR #9393: URL: https://github.com/apache/ozone/pull/9393#discussion_r2590776754
########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import jakarta.annotation.Nonnull; +import java.nio.ByteBuffer; + +/** + * A concrete implementation of the Codec interface for the CodecBuffer type. + * This class provides methods to serialize and deserialize CodecBuffer + * objects to and from byte arrays and to interact with CodecBuffer instances + * using different allocation strategies (direct or heap). + * + * The CodecBufferCodec distinguishes between direct and non-direct + * (heap-based) buffers, ensuring compatibility between the provided allocator + * and buffer type during serialization and deserialization. + * + * This codec supports CodecBuffer-based methods. + * NOTE: This codec does not create copies of CodecBuffer objects and it returns the CodecBuffer object itself + * consumer of this codec and thus the caller should not close the CodecBuffer object in case of + * {@link #copyObject(CodecBuffer)} and {@link #toCodecBuffer(CodecBuffer, CodecBuffer.Allocator)} methods. This has + * been done to avoid unnecessary memory allocations. + * However, it still has to handle lifecyle of CodecBuffer returned by {@link #fromPersistedFormat(byte[])} method. Review Comment: Spelling error: "lifecyle" should be "lifecycle" ```suggestion * However, it still has to handle lifecycle of CodecBuffer returned by {@link #fromPersistedFormat(byte[])} method. ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java: ########## @@ -287,7 +289,8 @@ void performFullDefragmentation(DBStore checkpointDBStore, TablePrefixInfo prefi // Compact the table completely with kForce to get rid of tombstones. try (ManagedCompactRangeOptions compactRangeOptions = new ManagedCompactRangeOptions()) { compactRangeOptions.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce); - compactRangeOptions.setExclusiveManualCompaction(true); + // Need to allow the range tombstones to change levels and get propogated to bottommost level. Review Comment: Spelling error: "propogated" should be "propagated" ```suggestion // Need to allow the range tombstones to change levels and get propagated to bottommost level. ``` ########## hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java: ########## @@ -195,11 +287,646 @@ public void testNeedsDefragmentationRequiresDefrag() throws IOException { verify(provider).close(); } + private Map<String, InMemoryTestTable<String, CodecBuffer>> createTables(String... tableNames) { + return createTables(dummyTableValues, tableNames); + } + + private Map<String, InMemoryTestTable<String, CodecBuffer>> createTables( + Map<String, CodecBuffer> tableValues, String... tableNames) { + Map<String, InMemoryTestTable<String, CodecBuffer>> tables = new HashMap<>(); + for (String tableName : tableNames) { + tables.put(tableName, new StringInMemoryTestTable<>(tableValues, tableName)); + } + return tables; + } + + @Test + public void testPerformFullDefragmentation() throws Exception { + DBStore checkpointDBStore = mock(DBStore.class); + Map<String, InMemoryTestTable<String, CodecBuffer>> tableMap = createTables("cf1", "cf2", "cf3"); + TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1", "ab", "cf2", "cd", + "cf3", "ef")); + Map<String, Map<String, CodecBuffer>> tablesCompacted = new HashMap<>(); + Set<String> incrementalTables = Stream.of("cf1", "cf2").collect(Collectors.toSet()); + when(checkpointDBStore.getTable(anyString(), eq(StringCodec.get()), eq(CodecBufferCodec.get(true)))) + .thenAnswer(i -> { + String tableName = i.getArgument(0, String.class); + return tableMap.getOrDefault(tableName, null); + }); + doAnswer(i -> { + String tableName = i.getArgument(0, String.class); + Map<String, CodecBuffer> table = new HashMap<>(tableMap.get(tableName).getMap()); + tablesCompacted.putIfAbsent(tableName, table); + return null; + + }).when(checkpointDBStore).compactTable(anyString(), any()); + + defragService.performFullDefragmentation(checkpointDBStore, prefixInfo, incrementalTables); + assertEquals(2, tablesCompacted.size()); + for (Map.Entry<String, Map<String, CodecBuffer>> compactedTable : tablesCompacted.entrySet()) { + String prefix = prefixInfo.getTablePrefix(compactedTable.getKey()); + Map<String, String> compactedStringTable = compactedTable.getValue().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> getFromCodecBuffer(e.getValue()))); + Map<String, String> expectedValue = dummyTableValues.entrySet().stream() + .filter(e -> e.getKey().startsWith(prefix)) + .collect(Collectors.toMap(Map.Entry::getKey, e -> getFromCodecBuffer(e.getValue()))); + assertEquals(expectedValue, compactedStringTable); + assertEquals(expectedValue, tableMap.get(compactedTable.getKey()).getMap().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> getFromCodecBuffer(e.getValue())))); + } + for (Map.Entry<String, InMemoryTestTable<String, CodecBuffer>> nonCompactedTable : tableMap.entrySet()) { + if (!tablesCompacted.containsKey(nonCompactedTable.getKey())) { + assertEquals(dummyTableValues, nonCompactedTable.getValue().getMap()); + } + } + } + + @Test + public void testIngestNonIncrementalTables() throws Exception { + DBStore checkpointDBStore = mock(DBStore.class); + DBStore snapshotDBStore = mock(DBStore.class); + SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), "vol1", "bucket1", "snap1"); + TablePrefixInfo prefixInfo = new TablePrefixInfo(ImmutableMap.of("cf1", "ab", "cf2", "cd", + "cf3", "ef", "cf4", "")); + Set<String> incrementalTables = Stream.of("cf1", "cf2").collect(Collectors.toSet()); + Map<String, String> dumpedFileName = new HashMap<>(); + Map<String, Table<CodecBuffer, CodecBuffer>> snapshotTables = Stream.of("cf1", "cf2", "cf3", "cf4", "cf5") + .map(name -> { + Table<CodecBuffer, CodecBuffer> table = mock(Table.class); + when(table.getName()).thenReturn(name); + try { + doAnswer(i -> { + CodecBuffer prefixBytes = i.getArgument(1) == null ? null : i.getArgument(1, CodecBuffer.class); + String prefix = prefixBytes == null ? "" : StringCodec.get().fromCodecBuffer(prefixBytes); + assertEquals(prefixInfo.getTablePrefix(name), prefix); + dumpedFileName.put(name, i.getArgument(0, File.class).toPath().toAbsolutePath().toString()); + return null; + }).when(table).dumpToFileWithPrefix(any(File.class), any()); + } catch (Exception e) { + throw new RuntimeException(e); + } + return table; + }).collect(Collectors.toMap(Table::getName, Function.identity())); + Map<String, String> ingestedFiles = new HashMap<>(); + Map<String, Table> checkpointTables = Stream.of("cf3", "cf4", "cf5") + .map(name -> { + Table<CodecBuffer, CodecBuffer> table = mock(Table.class); + when(table.getName()).thenReturn(name); + try { + doAnswer(i -> { + File file = i.getArgument(0, File.class); + ingestedFiles.put(name, file.toPath().toAbsolutePath().toString()); + return null; + }).when(table).loadFromFile(any(File.class)); + } catch (RocksDatabaseException e) { + throw new RuntimeException(e); + } + return table; + }).collect(Collectors.toMap(Table::getName, Function.identity())); + + OmSnapshot snapshot = mock(OmSnapshot.class); + OmMetadataManagerImpl snapshotMetadataManager = mock(OmMetadataManagerImpl.class); + UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new UncheckedAutoCloseableSupplier<OmSnapshot>() { + @Override + public OmSnapshot get() { + return snapshot; + } + + @Override + public void close() { + + } + }; + + when(omSnapshotManager.getActiveSnapshot(anyString(), anyString(), anyString())).thenReturn(snapshotSupplier); + when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager); + when(snapshotMetadataManager.getStore()).thenReturn(snapshotDBStore); + List<Table<?, ?>> snapshotTableList = new ArrayList<>(snapshotTables.values()); + when(snapshotDBStore.listTables()).thenReturn(snapshotTableList); + + doAnswer(i -> { + String tableName = i.getArgument(0, String.class); + return snapshotTables.get(tableName); + }).when(snapshotDBStore).getTable(anyString(), eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true))); + doAnswer(i -> { + String tableName = i.getArgument(0, String.class); + return checkpointTables.get(tableName); + }).when(checkpointDBStore).getTable(anyString(), eq(CodecBufferCodec.get(true)), eq(CodecBufferCodec.get(true))); + + defragService.ingestNonIncrementalTables(checkpointDBStore, snapshotInfo, prefixInfo, incrementalTables); + assertEquals(checkpointTables.keySet(), dumpedFileName.keySet()); + assertEquals(dumpedFileName, ingestedFiles); + } + + private void assertContents(Map<String, Map<String, String>> tableContents, DBStore dbStore) + throws RocksDatabaseException, CodecException { + for (String tableName : dbStore.getTableNames().values()) { + Table<String, String> table = dbStore.getTable(tableName, StringCodec.get(), StringCodec.get()); + try (Table.KeyValueIterator<String, String> iterator = table.iterator()) { + Map<String, String> expectedContents = tableContents.get(tableName); + Map<String, String> actualContents = new HashMap<>(); + while (iterator.hasNext()) { + Table.KeyValue<String, String> kv = iterator.next(); + actualContents.put(kv.getKey(), kv.getValue()); + } + assertNotNull(expectedContents, "Expected contents for table " + tableName + " is null"); + assertEquals(expectedContents, actualContents, "Table contents mismatch for table " + tableName); + } + } + } + + private static Stream<Arguments> testCreateCheckpointCases() { + // Have random tables to be incremental to ensure content gets preserved. + return Stream.of( + Arguments.of(ImmutableSet.of(KEY_TABLE, BUCKET_TABLE, DIRECTORY_TABLE)), + Arguments.of(ImmutableSet.of(FILE_TABLE, DIRECTORY_TABLE, KEY_TABLE)), + Arguments.of(ImmutableSet.of(VOLUME_TABLE, BUCKET_TABLE, DELEGATION_TOKEN_TABLE)) + ); + } + + private Map<String, Map<String, String>> createTableContents(Path path, String keyPrefix) throws IOException { + DBCheckpoint snapshotCheckpointLocation = new RocksDBCheckpoint(path); + Map<String, Map<String, String>> tableContents = new HashMap<>(); + try (OmMetadataManagerImpl metadataManager = OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, + snapshotCheckpointLocation, false)) { + Set<String> metadataManagerTables = new HashSet<>(metadataManager.listTableNames()); + for (String tableName : metadataManager.getStore().getTableNames().values()) { + if (metadataManagerTables.contains(tableName)) { + Table<String, String> table = metadataManager.getStore().getTable(tableName, + StringCodec.get(), StringCodec.get()); + for (int i = 0; i < 10; i++) { + String key = tableName + keyPrefix + i; + String value = "value_" + i; + table.put(key, value); + tableContents.computeIfAbsent(tableName, k -> new HashMap<>()).put(key, value); + } + } else { + tableContents.put(tableName, Collections.emptyMap()); + } + } + } + return tableContents; + } + + @ParameterizedTest + @MethodSource("testCreateCheckpointCases") + public void testCreateCheckpoint(Set<String> incrementalTables) throws Exception { + SnapshotInfo snapshotInfo = createMockSnapshotInfo(UUID.randomUUID(), "vol1", "bucket1", "snap1"); + DBCheckpoint snapshotCheckpointLocation = + new RocksDBCheckpoint(tempDir.resolve(snapshotInfo.getSnapshotId().toString())); + Map<String, Map<String, String>> tableContents = + createTableContents(snapshotCheckpointLocation.getCheckpointLocation(), "_key_"); + UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier = new UncheckedAutoCloseableSupplier<OmSnapshot>() { + private final OmMetadataManagerImpl snapshotMetadataManager = + OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, snapshotCheckpointLocation, false); + + @Override + public OmSnapshot get() { + OmSnapshot snapshot = mock(OmSnapshot.class); + when(snapshot.getMetadataManager()).thenReturn(snapshotMetadataManager); + return snapshot; + } + + @Override + public void close() { + try { + snapshotMetadataManager.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }; + + when(omSnapshotManager.getActiveSnapshot(eq(snapshotInfo.getVolumeName()), eq(snapshotInfo.getBucketName()), + eq(snapshotInfo.getName()))).thenReturn(snapshotSupplier); + try (OmMetadataManagerImpl result = defragService.createCheckpoint(snapshotInfo, incrementalTables)) { + try (OmMetadataManagerImpl originalSnapshotStore = + OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, snapshotCheckpointLocation)) { + assertContents(tableContents, originalSnapshotStore.getStore()); + } catch (Throwable e) { + throw new RuntimeException("Failed to load original snapshot store", e); + } + // Ensure non-incremental tables are cleared. + tableContents.entrySet().stream() + .filter(e -> !incrementalTables.contains(e.getKey())) + .forEach(e -> e.getValue().clear()); + assertContents(tableContents, result.getStore()); + } + } + + private void assertContents(Map<String, Map<String, String>> contents, Path path) throws IOException { + DBCheckpoint dbCheckpoint = new RocksDBCheckpoint(path); + try (OmMetadataManagerImpl metadataManager = OmMetadataManagerImpl.createCheckpointMetadataManager(configuration, + dbCheckpoint, true)) { + assertContents(contents, metadataManager.getStore()); + } + } + + @Test + public void testAtomicSwitchSnapshotDB() throws Exception { + UUID snapshotId = UUID.randomUUID(); + Path checkpointPath = tempDir.resolve("checkpoint"); + Map<String, Map<String, String>> checkpointContent = createTableContents(checkpointPath, "_cp1_"); + WritableOmSnapshotLocalDataProvider provider = mock(WritableOmSnapshotLocalDataProvider.class); + OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class); + + when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotId)).thenReturn(provider); + when(provider.getSnapshotLocalData()).thenReturn(localData); + AtomicInteger version = new AtomicInteger(1); + when(localData.getVersion()).thenAnswer(i -> version.get()); + doAnswer(i -> { + version.incrementAndGet(); + return null; + }).when(provider).addSnapshotVersion(any()); + + Path nextVersionPath = tempDir.resolve(snapshotId + "v2").toAbsolutePath(); + try (MockedStatic<OmSnapshotManager> mockedStatic = Mockito.mockStatic(OmSnapshotManager.class, + CALLS_REAL_METHODS)) { + mockedStatic.when(() -> OmSnapshotManager.getSnapshotPath(eq(metadataManager), eq(snapshotId), eq(2))) + .thenReturn(nextVersionPath); + Map<String, Map<String, String>> existingVersionContents = createTableContents(nextVersionPath, "_cp2_"); + assertNotEquals(existingVersionContents, checkpointContent); + assertContents(existingVersionContents, nextVersionPath); + int result = defragService.atomicSwitchSnapshotDB(snapshotId, checkpointPath); + assertContents(checkpointContent, nextVersionPath); + assertEquals(1, result); + assertEquals(2, version.get()); + } + assertNull(verify(provider).getSnapshotLocalData()); Review Comment: Incorrect assertion usage. `assertNull(verify(provider).getSnapshotLocalData())` is checking if the return value from `verify()` is null, but `verify()` returns the mocked object for chaining, not the method's return value. This should likely be `verify(provider).getSnapshotLocalData()` without `assertNull`, or if you want to assert the mock was called and returned null, use `when(...).thenReturn(null)` setup and verify separately. The current assertion will always fail since `verify()` never returns null. ```suggestion verify(provider).getSnapshotLocalData(); ``` ########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBufferCodec.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import jakarta.annotation.Nonnull; +import java.nio.ByteBuffer; + +/** + * A concrete implementation of the Codec interface for the CodecBuffer type. + * This class provides methods to serialize and deserialize CodecBuffer + * objects to and from byte arrays and to interact with CodecBuffer instances + * using different allocation strategies (direct or heap). + * + * The CodecBufferCodec distinguishes between direct and non-direct + * (heap-based) buffers, ensuring compatibility between the provided allocator + * and buffer type during serialization and deserialization. + * + * This codec supports CodecBuffer-based methods. + * NOTE: This codec does not create copies of CodecBuffer objects and it returns the CodecBuffer object itself + * consumer of this codec and thus the caller should not close the CodecBuffer object in case of Review Comment: Incomplete sentence in javadoc. The sentence "consumer of this codec and thus the caller should not close..." appears to be missing the beginning of the thought. It should likely read: "This codec does not create copies and returns the CodecBuffer object itself **to the** consumer of this codec..." ```suggestion * NOTE: This codec does not create copies of CodecBuffer objects and it returns the CodecBuffer object itself to the * consumer of this codec, and thus the caller should not close the CodecBuffer object in case of ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
