smengcl commented on code in PR #5165:
URL: https://github.com/apache/ozone/pull/5165#discussion_r1290737673
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdb/util/TestManagedSstFileReader.java:
##########
@@ -91,65 +95,131 @@ private Map<String, Integer> createKeys(int startRange,
int endRange) {
i -> i % 2));
}
- private Pair<Map<String, Integer>, List<String>> createDummyData(
+ private Pair<TreeMap<String, Integer>, List<String>> createDummyData(
int numberOfFiles) throws RocksDBException, IOException {
List<String> files = new ArrayList<>();
int numberOfKeysPerFile = 1000;
- Map<String, Integer> keys = new HashMap<>();
+ TreeMap<String, Integer> keys =
+ new TreeMap<>(createKeys(0, numberOfKeysPerFile * numberOfFiles));
+ List<TreeMap<String, Integer>> fileKeysList =
+ IntStream.range(0, numberOfFiles)
+ .mapToObj(i -> new TreeMap<String, Integer>())
+ .collect(Collectors.toList());
int cnt = 0;
- for (int i = 0; i < numberOfFiles; i++) {
- TreeMap<String, Integer> fileKeys = new TreeMap<>(createKeys(cnt,
- cnt + numberOfKeysPerFile));
- cnt += fileKeys.size();
+ for (Map.Entry<String, Integer> kv : keys.entrySet()) {
+ fileKeysList.get(cnt % numberOfFiles).put(kv.getKey(), kv.getValue());
+ cnt += 1;
+ }
+ for (TreeMap<String, Integer> fileKeys : fileKeysList) {
String tmpSSTFile = createRandomSSTFile(fileKeys);
files.add(tmpSSTFile);
- keys.putAll(fileKeys);
}
return Pair.of(keys, files);
}
+ public String getLexicographicallyLowerString(String val) {
+ char[] charVal = val.toCharArray();
+ charVal[charVal.length -1] -= 1;
+ return String.valueOf(charVal);
+ }
+
+ public String getLexicographicallyHigherString(String val) {
+ char[] charVal = val.toCharArray();
+ charVal[charVal.length -1] += 1;
+ return String.valueOf(charVal);
+ }
+
+ private List<Optional<String>> getTestingBounds(
+ TreeMap<String, Integer> keys) {
+ Set<String> boundary = new HashSet<>();
+ if (keys.size() > 0) {
+ List<String> sortedKeys = new ArrayList<>(keys.keySet());
+ boundary.add(getLexicographicallyLowerString(keys.firstKey()));
+ boundary.add(keys.firstKey());
+ for (int i = 1; i <= 10; i++) {
+ boundary.add(sortedKeys.get((i * keys.size() / 10) -1));
+ }
+ boundary.add(getLexicographicallyHigherString(keys.lastKey()));
+ }
+ List<Optional<String>> bounds = boundary.stream().map(Optional::of)
+ .collect(Collectors.toList());
+ bounds.add(Optional.empty());
+ return bounds;
+ }
+
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3, 7, 10})
public void testGetKeyStream(int numberOfFiles)
throws RocksDBException, IOException {
- Pair<Map<String, Integer>, List<String>> data =
+ Pair<TreeMap<String, Integer>, List<String>> data =
createDummyData(numberOfFiles);
List<String> files = data.getRight();
- Map<String, Integer> keys = data.getLeft();
- try (Stream<String> keyStream =
- new ManagedSstFileReader(files).getKeyStream()) {
- keyStream.forEach(key -> {
- Assertions.assertEquals(keys.get(key), 1);
- keys.remove(key);
- });
- keys.values().forEach(val -> Assertions.assertEquals(0, val));
+ TreeMap<String, Integer> keys = data.getLeft();
+ List<Optional<String>> bounds = getTestingBounds(keys);
+ for (Optional<String> lowerBound : bounds) {
+ for (Optional<String> upperBound : bounds) {
+ Map<String, Integer> keysInBoundary =
+ keys.entrySet().stream().filter(entry -> lowerBound
+ .map(l -> entry.getKey().compareTo(l) >= 0)
+ .orElse(true) &&
+ upperBound.map(u -> entry.getKey().compareTo(u) < 0)
+ .orElse(true))
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue));
+ try (Stream<String> keyStream =
+ new ManagedSstFileReader(files).getKeyStream(
+ lowerBound, upperBound)) {
+ keyStream.forEach(key -> {
+ Assertions.assertEquals(keysInBoundary.get(key), 1);
+ Assertions.assertNotNull(keysInBoundary.remove(key));
+ });
+ keysInBoundary.values().forEach(val -> Assertions.assertEquals(0,
val));
+ }
+ }
}
}
@Native(ROCKS_TOOLS_NATIVE_LIBRARY_NAME)
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3, 7, 10})
- @Disabled("HDDS-8818")
public void testGetKeyStreamWithTombstone(int numberOfFiles)
throws RocksDBException, IOException, NativeLibraryNotLoadedException {
Assumptions.assumeTrue(NativeLibraryLoader.getInstance()
.loadLibrary(ROCKS_TOOLS_NATIVE_LIBRARY_NAME));
- Pair<Map<String, Integer>, List<String>> data =
+ Pair<TreeMap<String, Integer>, List<String>> data =
createDummyData(numberOfFiles);
List<String> files = data.getRight();
- Map<String, Integer> keys = data.getLeft();
+ TreeMap<String, Integer> keys = data.getLeft();
ExecutorService executorService = new ThreadPoolExecutor(0,
- 1, 60, TimeUnit.SECONDS,
+ 2, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadFactoryBuilder()
.setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d")
.build(), new ThreadPoolExecutor.DiscardPolicy());
ManagedSSTDumpTool sstDumpTool =
new ManagedSSTDumpTool(executorService, 256);
- try (Stream<String> keyStream = new ManagedSstFileReader(files)
- .getKeyStreamWithTombstone(sstDumpTool)) {
- keyStream.forEach(keys::remove);
- Assertions.assertEquals(0, keys.size());
+ List<Optional<String>> bounds = getTestingBounds(keys);
+ try {
+ for (Optional<String> lowerBound : bounds) {
+ for (Optional<String> upperBound : bounds) {
+ Map<String, Integer> keysInBoundary =
+ keys.entrySet().stream().filter(entry -> lowerBound
+ .map(l -> entry.getKey().compareTo(l) >= 0)
+ .orElse(true) &&
+ upperBound.map(u -> entry.getKey().compareTo(u) < 0)
+ .orElse(true))
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue));
+ try (Stream<String> keyStream = new ManagedSstFileReader(files)
+ .getKeyStreamWithTombstone(sstDumpTool, lowerBound, upperBound))
{
+ keyStream.forEach(
+ key -> {
+ Assertions.assertNotNull(keysInBoundary.remove(key));
+ });
+ }
+ Assertions.assertEquals(0, keysInBoundary.size());
+ }
+ }
Review Comment:
Please add comments around such lambda to clarify the intention for others
to review.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdb/util/TestManagedSstFileReader.java:
##########
@@ -91,65 +95,131 @@ private Map<String, Integer> createKeys(int startRange,
int endRange) {
i -> i % 2));
}
- private Pair<Map<String, Integer>, List<String>> createDummyData(
+ private Pair<TreeMap<String, Integer>, List<String>> createDummyData(
int numberOfFiles) throws RocksDBException, IOException {
List<String> files = new ArrayList<>();
int numberOfKeysPerFile = 1000;
- Map<String, Integer> keys = new HashMap<>();
+ TreeMap<String, Integer> keys =
+ new TreeMap<>(createKeys(0, numberOfKeysPerFile * numberOfFiles));
+ List<TreeMap<String, Integer>> fileKeysList =
+ IntStream.range(0, numberOfFiles)
+ .mapToObj(i -> new TreeMap<String, Integer>())
+ .collect(Collectors.toList());
int cnt = 0;
- for (int i = 0; i < numberOfFiles; i++) {
- TreeMap<String, Integer> fileKeys = new TreeMap<>(createKeys(cnt,
- cnt + numberOfKeysPerFile));
- cnt += fileKeys.size();
+ for (Map.Entry<String, Integer> kv : keys.entrySet()) {
+ fileKeysList.get(cnt % numberOfFiles).put(kv.getKey(), kv.getValue());
+ cnt += 1;
+ }
+ for (TreeMap<String, Integer> fileKeys : fileKeysList) {
String tmpSSTFile = createRandomSSTFile(fileKeys);
files.add(tmpSSTFile);
- keys.putAll(fileKeys);
}
return Pair.of(keys, files);
}
+ public String getLexicographicallyLowerString(String val) {
+ char[] charVal = val.toCharArray();
+ charVal[charVal.length -1] -= 1;
+ return String.valueOf(charVal);
+ }
+
+ public String getLexicographicallyHigherString(String val) {
+ char[] charVal = val.toCharArray();
+ charVal[charVal.length -1] += 1;
+ return String.valueOf(charVal);
+ }
+
+ private List<Optional<String>> getTestingBounds(
+ TreeMap<String, Integer> keys) {
+ Set<String> boundary = new HashSet<>();
+ if (keys.size() > 0) {
+ List<String> sortedKeys = new ArrayList<>(keys.keySet());
+ boundary.add(getLexicographicallyLowerString(keys.firstKey()));
+ boundary.add(keys.firstKey());
+ for (int i = 1; i <= 10; i++) {
+ boundary.add(sortedKeys.get((i * keys.size() / 10) -1));
+ }
+ boundary.add(getLexicographicallyHigherString(keys.lastKey()));
+ }
+ List<Optional<String>> bounds = boundary.stream().map(Optional::of)
+ .collect(Collectors.toList());
+ bounds.add(Optional.empty());
+ return bounds;
+ }
+
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3, 7, 10})
public void testGetKeyStream(int numberOfFiles)
throws RocksDBException, IOException {
- Pair<Map<String, Integer>, List<String>> data =
+ Pair<TreeMap<String, Integer>, List<String>> data =
createDummyData(numberOfFiles);
List<String> files = data.getRight();
- Map<String, Integer> keys = data.getLeft();
- try (Stream<String> keyStream =
- new ManagedSstFileReader(files).getKeyStream()) {
- keyStream.forEach(key -> {
- Assertions.assertEquals(keys.get(key), 1);
- keys.remove(key);
- });
- keys.values().forEach(val -> Assertions.assertEquals(0, val));
+ TreeMap<String, Integer> keys = data.getLeft();
+ List<Optional<String>> bounds = getTestingBounds(keys);
+ for (Optional<String> lowerBound : bounds) {
+ for (Optional<String> upperBound : bounds) {
+ Map<String, Integer> keysInBoundary =
+ keys.entrySet().stream().filter(entry -> lowerBound
+ .map(l -> entry.getKey().compareTo(l) >= 0)
+ .orElse(true) &&
+ upperBound.map(u -> entry.getKey().compareTo(u) < 0)
+ .orElse(true))
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue));
+ try (Stream<String> keyStream =
+ new ManagedSstFileReader(files).getKeyStream(
+ lowerBound, upperBound)) {
+ keyStream.forEach(key -> {
+ Assertions.assertEquals(keysInBoundary.get(key), 1);
+ Assertions.assertNotNull(keysInBoundary.remove(key));
+ });
+ keysInBoundary.values().forEach(val -> Assertions.assertEquals(0,
val));
+ }
+ }
}
}
@Native(ROCKS_TOOLS_NATIVE_LIBRARY_NAME)
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3, 7, 10})
- @Disabled("HDDS-8818")
public void testGetKeyStreamWithTombstone(int numberOfFiles)
throws RocksDBException, IOException, NativeLibraryNotLoadedException {
Assumptions.assumeTrue(NativeLibraryLoader.getInstance()
.loadLibrary(ROCKS_TOOLS_NATIVE_LIBRARY_NAME));
- Pair<Map<String, Integer>, List<String>> data =
+ Pair<TreeMap<String, Integer>, List<String>> data =
createDummyData(numberOfFiles);
List<String> files = data.getRight();
- Map<String, Integer> keys = data.getLeft();
+ TreeMap<String, Integer> keys = data.getLeft();
ExecutorService executorService = new ThreadPoolExecutor(0,
- 1, 60, TimeUnit.SECONDS,
+ 2, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), new ThreadFactoryBuilder()
.setNameFormat("snapshot-diff-manager-sst-dump-tool-TID-%d")
.build(), new ThreadPoolExecutor.DiscardPolicy());
ManagedSSTDumpTool sstDumpTool =
new ManagedSSTDumpTool(executorService, 256);
- try (Stream<String> keyStream = new ManagedSstFileReader(files)
- .getKeyStreamWithTombstone(sstDumpTool)) {
- keyStream.forEach(keys::remove);
- Assertions.assertEquals(0, keys.size());
+ List<Optional<String>> bounds = getTestingBounds(keys);
+ try {
+ for (Optional<String> lowerBound : bounds) {
+ for (Optional<String> upperBound : bounds) {
+ Map<String, Integer> keysInBoundary =
+ keys.entrySet().stream().filter(entry -> lowerBound
+ .map(l -> entry.getKey().compareTo(l) >= 0)
+ .orElse(true) &&
+ upperBound.map(u -> entry.getKey().compareTo(u) < 0)
+ .orElse(true))
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue));
+ try (Stream<String> keyStream = new ManagedSstFileReader(files)
+ .getKeyStreamWithTombstone(sstDumpTool, lowerBound, upperBound))
{
+ keyStream.forEach(
+ key -> {
+ Assertions.assertNotNull(keysInBoundary.remove(key));
+ });
+ }
+ Assertions.assertEquals(0, keysInBoundary.size());
+ }
+ }
Review Comment:
Please add comments around such lambdas to clarify the intention for others
to review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]