errose28 commented on code in PR #8363:
URL: https://github.com/apache/ozone/pull/8363#discussion_r2093358476
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -95,132 +162,178 @@ void findCandidateKeys(OzoneClient ozoneClient,
OzoneAddress address) throws IOE
String bucketName = address.getBucketName();
String keyName = address.getKeyName();
- ObjectNode root = JsonUtils.createObjectNode(null);
- ArrayNode keysArray = root.putArray("keys");
-
AtomicBoolean allKeysPassed = new AtomicBoolean(true);
+ File outputFile = new File(outputDir, "replicas-verify-result.json");
+
+ try (OutputStream outputStream =
Files.newOutputStream(outputFile.toPath());
+ JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(outputStream, JsonEncoding.UTF8)) {
+ // open json
+ jsonGenerator.useDefaultPrettyPrinter();
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeFieldName("keys");
+ jsonGenerator.writeStartArray();
+ jsonGenerator.flush();
+
+ try (SequenceWriter sequenceWriter = createSequenceWriter(false,
jsonGenerator)) {
+ // Process keys based on the provided address
+ if (!keyName.isEmpty()) {
+ processKey(ozoneClient, volumeName, bucketName, keyName,
sequenceWriter, allKeysPassed);
+ } else if (!bucketName.isEmpty()) {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ checkBucket(ozoneClient, bucket, sequenceWriter, allKeysPassed);
+ } else if (!volumeName.isEmpty()) {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ checkVolume(ozoneClient, volume, sequenceWriter, allKeysPassed);
+ } else {
+ for (Iterator<? extends OzoneVolume> it =
objectStore.listVolumes(null); it.hasNext();) {
+ checkVolume(ozoneClient, it.next(), sequenceWriter, allKeysPassed);
+ }
+ }
+
+ // Wait for all futures to complete
+ CompletableFuture<Void> allOf =
CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0]));
+ try {
+ allOf.join();
+ } catch (Exception e) {
+ LOG.error("Error during verification", e);
+ }
+ }
- if (!keyName.isEmpty()) {
- processKey(ozoneClient, volumeName, bucketName, keyName, keysArray,
allKeysPassed);
- } else if (!bucketName.isEmpty()) {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- OzoneBucket bucket = volume.getBucket(bucketName);
- checkBucket(ozoneClient, bucket, keysArray, allKeysPassed);
- } else if (!volumeName.isEmpty()) {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- checkVolume(ozoneClient, volume, keysArray, allKeysPassed);
- } else {
- for (Iterator<? extends OzoneVolume> it = objectStore.listVolumes(null);
it.hasNext();) {
- checkVolume(ozoneClient, it.next(), keysArray, allKeysPassed);
+ // close json
+ try {
+ jsonGenerator.writeEndArray();
+ jsonGenerator.writeBooleanField("pass", allKeysPassed.get());
+ jsonGenerator.writeEndObject();
+ } catch (Exception e) {
+ LOG.error("Exception in closing the JSON structure", e);
}
}
- root.put("pass", allKeysPassed.get());
- System.out.println(JsonUtils.toJsonStringWithDefaultPrettyPrinter(root));
}
- void checkVolume(OzoneClient ozoneClient, OzoneVolume volume, ArrayNode
keysArray, AtomicBoolean allKeysPassed)
- throws IOException {
+ void checkVolume(OzoneClient ozoneClient, OzoneVolume volume,
+ SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws
IOException {
for (Iterator<? extends OzoneBucket> it = volume.listBuckets(null);
it.hasNext();) {
OzoneBucket bucket = it.next();
- checkBucket(ozoneClient, bucket, keysArray, allKeysPassed);
+ checkBucket(ozoneClient, bucket, sequenceWriter, allKeysPassed);
}
}
- void checkBucket(OzoneClient ozoneClient, OzoneBucket bucket, ArrayNode
keysArray, AtomicBoolean allKeysPassed)
- throws IOException {
+ void checkBucket(OzoneClient ozoneClient, OzoneBucket bucket,
+ SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws
IOException {
for (Iterator<? extends OzoneKey> it = bucket.listKeys(null);
it.hasNext();) {
OzoneKey key = it.next();
// TODO: Remove this check once HDDS-12094 is fixed
if (!key.getName().endsWith("/")) {
- processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
key.getName(), keysArray, allKeysPassed);
+ processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
+ key.getName(), sequenceWriter, allKeysPassed);
}
}
}
- void processKey(OzoneClient ozoneClient, String volumeName, String
bucketName, String keyName,
- ArrayNode keysArray, AtomicBoolean allKeysPassed) throws IOException {
- OmKeyInfo keyInfo = ozoneClient.getProxy().getKeyInfo(
- volumeName, bucketName, keyName, false);
-
- ObjectNode keyNode = JsonUtils.createObjectNode(null);
- keyNode.put("volumeName", volumeName);
- keyNode.put("bucketName", bucketName);
- keyNode.put("name", keyName);
-
- ArrayNode blocksArray = keyNode.putArray("blocks");
- boolean keyPass = true;
-
- for (OmKeyLocationInfo keyLocation :
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) {
- long containerID = keyLocation.getContainerID();
- long localID = keyLocation.getLocalID();
-
- ObjectNode blockNode = blocksArray.addObject();
- blockNode.put("containerID", containerID);
- blockNode.put("blockID", localID);
-
- ArrayNode replicasArray = blockNode.putArray("replicas");
- boolean blockPass = true;
-
- for (DatanodeDetails datanode : keyLocation.getPipeline().getNodes()) {
- ObjectNode replicaNode = replicasArray.addObject();
-
- ObjectNode datanodeNode = replicaNode.putObject("datanode");
- datanodeNode.put("uuid", datanode.getUuidString());
- datanodeNode.put("hostname", datanode.getHostName());
-
- ArrayNode checksArray = replicaNode.putArray("checks");
- boolean replicaPass = true;
- int replicaIndex = keyLocation.getPipeline().getReplicaIndex(datanode);
+ private void processKey(OzoneClient ozoneClient, String volumeName, String
bucketName,
+ String keyName, SequenceWriter sequenceWriter, AtomicBoolean
allKeysPassed) {
+ CompletableFuture<Void> future =
+ CompletableFuture.supplyAsync(() ->
+ verifyKey(ozoneClient, volumeName, bucketName, keyName),
verificationExecutor)
+ .handleAsync((keyResult, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Error verifying key: {}/{}/{}", volumeName,
bucketName, keyName, throwable);
+ return new KeyVerificationResult(volumeName, bucketName,
keyName, new ArrayList<>(), false);
+ }
+ return keyResult;
+ }, verificationExecutor)
+ .thenAcceptAsync(keyResult ->
+ writeVerificationResult(sequenceWriter, allKeysPassed,
keyResult), writerExecutor);
+ allFutures.add(future);
+ }
- for (ReplicaVerifier verifier : replicaVerifiers) {
- BlockVerificationResult result = verifier.verifyBlock(datanode,
keyLocation, replicaIndex);
- ObjectNode checkNode = checksArray.addObject();
- checkNode.put("type", verifier.getType());
- checkNode.put("completed", result.isCompleted());
- checkNode.put("pass", result.passed());
+ private void writeVerificationResult(SequenceWriter sequenceWriter,
+ AtomicBoolean allKeysPassed, KeyVerificationResult keyResult) {
+ try {
+ allKeysPassed.compareAndSet(true, keyResult.isKeyPass());
+ if (!keyResult.isKeyPass() || allResults) {
+ ObjectNode keyNode = OBJECT_MAPPER.convertValue(keyResult,
ObjectNode.class);
+ sequenceWriter.write(keyNode);
+ }
+ } catch (IOException e) {
+ LOG.error("Error writing verification result", e);
+ throw new CompletionException(e);
+ }
+ }
- ArrayNode failuresArray = checkNode.putArray("failures");
- for (String failure : result.getFailures()) {
- failuresArray.addObject().put("message", failure);
+ private KeyVerificationResult verifyKey(OzoneClient ozoneClient, String
volumeName,
+ String bucketName, String keyName) {
+ try {
+ boolean keyPass = true;
+ OmKeyInfo keyInfo =
Review Comment:
I'm not sure we want parallelize the getKeyInfo call, at least not at the
same level as the verifiers. The verifiers are spread out across the datanodes,
so you can increase the number of threads as cluster size increases without
overloading it. If getKeyInfo is included in this, you have to worry about
overloading the OM too.
For the initial implementation I think we should leave getKeyInfo in the
single threaded execution section with the listing. A future improvement could
be to use a separate executor + thread pool for this portion.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -67,21 +96,59 @@ public class ReplicasVerify extends Handler {
@CommandLine.ArgGroup(exclusive = false, multiplicity = "1")
private Verification verification;
- private List<ReplicaVerifier> replicaVerifiers;
+ @CommandLine.Option(names = "--threads",
+ description = "Number of threads to use for verification",
+ defaultValue = "10")
+ private int threadCount;
+
+ private ExecutorService verificationExecutor;
+ private ExecutorService writerExecutor;
+ private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
+ private List<CompletableFuture<Void>> allFutures;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(SerializationFeature.INDENT_OUTPUT, true);
+ private static final ObjectWriter WRITER = OBJECT_MAPPER.writer();
@Override
protected void execute(OzoneClient client, OzoneAddress address) throws
IOException {
- replicaVerifiers = new ArrayList<>();
-
- if (verification.doExecuteChecksums) {
- replicaVerifiers.add(new ChecksumVerifier(getConf()));
- }
+ allFutures = new ArrayList<>();
+ verificationExecutor = Executors.newFixedThreadPool(threadCount);
+ writerExecutor = Executors.newSingleThreadExecutor();
+ threadLocalVerifiers = ThreadLocal.withInitial(() -> {
+ List<ReplicaVerifier> verifiers = new ArrayList<>();
+ try {
+ if (verification.doExecuteChecksums) {
+ verifiers.add(new ChecksumVerifier(getConf()));
+ }
- if (verification.doExecuteBlockExistence) {
- replicaVerifiers.add(new BlockExistenceVerifier(getConf()));
+ if (verification.doExecuteBlockExistence) {
+ verifiers.add(new BlockExistenceVerifier(getConf()));
+ }
+ } catch (IOException e) {
+ LOG.error("Error initializing verifiers", e);
+ throw new RuntimeException("Error initializing verifiers", e);
+ }
+ return verifiers;
+ });
+
+ try {
+ createOutputDirectory();
+ findCandidateKeys(client, address);
+ } finally {
+ verificationExecutor.shutdown();
+ writerExecutor.shutdown();
+
+ try {
+ // Wait for all tasks to complete
+ verificationExecutor.awaitTermination(1, TimeUnit.DAYS);
+ writerExecutor.awaitTermination(1, TimeUnit.DAYS);
+ threadLocalVerifiers.remove();
Review Comment:
The thread invoking this never created any thread local instances so it
won't have any effect. I don't think we need to call this explicitly and can
just let the thread local instances get gc'ed when the threads terminate.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -67,21 +96,59 @@ public class ReplicasVerify extends Handler {
@CommandLine.ArgGroup(exclusive = false, multiplicity = "1")
private Verification verification;
- private List<ReplicaVerifier> replicaVerifiers;
+ @CommandLine.Option(names = "--threads",
+ description = "Number of threads to use for verification",
+ defaultValue = "10")
+ private int threadCount;
+
+ private ExecutorService verificationExecutor;
+ private ExecutorService writerExecutor;
+ private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
+ private List<CompletableFuture<Void>> allFutures;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(SerializationFeature.INDENT_OUTPUT, true);
+ private static final ObjectWriter WRITER = OBJECT_MAPPER.writer();
@Override
protected void execute(OzoneClient client, OzoneAddress address) throws
IOException {
- replicaVerifiers = new ArrayList<>();
-
- if (verification.doExecuteChecksums) {
- replicaVerifiers.add(new ChecksumVerifier(getConf()));
- }
+ allFutures = new ArrayList<>();
+ verificationExecutor = Executors.newFixedThreadPool(threadCount);
+ writerExecutor = Executors.newSingleThreadExecutor();
+ threadLocalVerifiers = ThreadLocal.withInitial(() -> {
+ List<ReplicaVerifier> verifiers = new ArrayList<>();
+ try {
+ if (verification.doExecuteChecksums) {
+ verifiers.add(new ChecksumVerifier(getConf()));
+ }
- if (verification.doExecuteBlockExistence) {
- replicaVerifiers.add(new BlockExistenceVerifier(getConf()));
+ if (verification.doExecuteBlockExistence) {
+ verifiers.add(new BlockExistenceVerifier(getConf()));
+ }
+ } catch (IOException e) {
+ LOG.error("Error initializing verifiers", e);
+ throw new RuntimeException("Error initializing verifiers", e);
+ }
+ return verifiers;
+ });
+
+ try {
+ createOutputDirectory();
Review Comment:
Splitting output to multiple json files in a directory will be implemented
in HDDS-13063. For now let's remove the unused directory option and send all
json to stdout.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -95,132 +162,178 @@ void findCandidateKeys(OzoneClient ozoneClient,
OzoneAddress address) throws IOE
String bucketName = address.getBucketName();
String keyName = address.getKeyName();
- ObjectNode root = JsonUtils.createObjectNode(null);
- ArrayNode keysArray = root.putArray("keys");
-
AtomicBoolean allKeysPassed = new AtomicBoolean(true);
+ File outputFile = new File(outputDir, "replicas-verify-result.json");
+
+ try (OutputStream outputStream =
Files.newOutputStream(outputFile.toPath());
+ JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(outputStream, JsonEncoding.UTF8)) {
+ // open json
+ jsonGenerator.useDefaultPrettyPrinter();
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeFieldName("keys");
+ jsonGenerator.writeStartArray();
+ jsonGenerator.flush();
+
+ try (SequenceWriter sequenceWriter = createSequenceWriter(false,
jsonGenerator)) {
+ // Process keys based on the provided address
+ if (!keyName.isEmpty()) {
+ processKey(ozoneClient, volumeName, bucketName, keyName,
sequenceWriter, allKeysPassed);
+ } else if (!bucketName.isEmpty()) {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ checkBucket(ozoneClient, bucket, sequenceWriter, allKeysPassed);
+ } else if (!volumeName.isEmpty()) {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ checkVolume(ozoneClient, volume, sequenceWriter, allKeysPassed);
+ } else {
+ for (Iterator<? extends OzoneVolume> it =
objectStore.listVolumes(null); it.hasNext();) {
+ checkVolume(ozoneClient, it.next(), sequenceWriter, allKeysPassed);
+ }
+ }
+
+ // Wait for all futures to complete
+ CompletableFuture<Void> allOf =
CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0]));
+ try {
+ allOf.join();
+ } catch (Exception e) {
+ LOG.error("Error during verification", e);
+ }
+ }
- if (!keyName.isEmpty()) {
- processKey(ozoneClient, volumeName, bucketName, keyName, keysArray,
allKeysPassed);
- } else if (!bucketName.isEmpty()) {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- OzoneBucket bucket = volume.getBucket(bucketName);
- checkBucket(ozoneClient, bucket, keysArray, allKeysPassed);
- } else if (!volumeName.isEmpty()) {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- checkVolume(ozoneClient, volume, keysArray, allKeysPassed);
- } else {
- for (Iterator<? extends OzoneVolume> it = objectStore.listVolumes(null);
it.hasNext();) {
- checkVolume(ozoneClient, it.next(), keysArray, allKeysPassed);
+ // close json
+ try {
+ jsonGenerator.writeEndArray();
+ jsonGenerator.writeBooleanField("pass", allKeysPassed.get());
+ jsonGenerator.writeEndObject();
+ } catch (Exception e) {
+ LOG.error("Exception in closing the JSON structure", e);
}
}
- root.put("pass", allKeysPassed.get());
- System.out.println(JsonUtils.toJsonStringWithDefaultPrettyPrinter(root));
}
- void checkVolume(OzoneClient ozoneClient, OzoneVolume volume, ArrayNode
keysArray, AtomicBoolean allKeysPassed)
- throws IOException {
+ void checkVolume(OzoneClient ozoneClient, OzoneVolume volume,
+ SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws
IOException {
for (Iterator<? extends OzoneBucket> it = volume.listBuckets(null);
it.hasNext();) {
OzoneBucket bucket = it.next();
- checkBucket(ozoneClient, bucket, keysArray, allKeysPassed);
+ checkBucket(ozoneClient, bucket, sequenceWriter, allKeysPassed);
}
}
- void checkBucket(OzoneClient ozoneClient, OzoneBucket bucket, ArrayNode
keysArray, AtomicBoolean allKeysPassed)
- throws IOException {
+ void checkBucket(OzoneClient ozoneClient, OzoneBucket bucket,
+ SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws
IOException {
for (Iterator<? extends OzoneKey> it = bucket.listKeys(null);
it.hasNext();) {
OzoneKey key = it.next();
// TODO: Remove this check once HDDS-12094 is fixed
if (!key.getName().endsWith("/")) {
- processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
key.getName(), keysArray, allKeysPassed);
+ processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
+ key.getName(), sequenceWriter, allKeysPassed);
}
}
}
- void processKey(OzoneClient ozoneClient, String volumeName, String
bucketName, String keyName,
- ArrayNode keysArray, AtomicBoolean allKeysPassed) throws IOException {
- OmKeyInfo keyInfo = ozoneClient.getProxy().getKeyInfo(
- volumeName, bucketName, keyName, false);
-
- ObjectNode keyNode = JsonUtils.createObjectNode(null);
- keyNode.put("volumeName", volumeName);
- keyNode.put("bucketName", bucketName);
- keyNode.put("name", keyName);
-
- ArrayNode blocksArray = keyNode.putArray("blocks");
- boolean keyPass = true;
-
- for (OmKeyLocationInfo keyLocation :
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) {
- long containerID = keyLocation.getContainerID();
- long localID = keyLocation.getLocalID();
-
- ObjectNode blockNode = blocksArray.addObject();
- blockNode.put("containerID", containerID);
- blockNode.put("blockID", localID);
-
- ArrayNode replicasArray = blockNode.putArray("replicas");
- boolean blockPass = true;
-
- for (DatanodeDetails datanode : keyLocation.getPipeline().getNodes()) {
- ObjectNode replicaNode = replicasArray.addObject();
-
- ObjectNode datanodeNode = replicaNode.putObject("datanode");
- datanodeNode.put("uuid", datanode.getUuidString());
- datanodeNode.put("hostname", datanode.getHostName());
-
- ArrayNode checksArray = replicaNode.putArray("checks");
- boolean replicaPass = true;
- int replicaIndex = keyLocation.getPipeline().getReplicaIndex(datanode);
+ private void processKey(OzoneClient ozoneClient, String volumeName, String
bucketName,
+ String keyName, SequenceWriter sequenceWriter, AtomicBoolean
allKeysPassed) {
+ CompletableFuture<Void> future =
+ CompletableFuture.supplyAsync(() ->
+ verifyKey(ozoneClient, volumeName, bucketName, keyName),
verificationExecutor)
+ .handleAsync((keyResult, throwable) -> {
Review Comment:
I don't think we need this stage. `verifyKey` handles all checked exceptions
and sets the result accordingly. If we somehow get a `RuntimeException` we
should let it crash the process.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -67,21 +96,59 @@ public class ReplicasVerify extends Handler {
@CommandLine.ArgGroup(exclusive = false, multiplicity = "1")
private Verification verification;
- private List<ReplicaVerifier> replicaVerifiers;
+ @CommandLine.Option(names = "--threads",
+ description = "Number of threads to use for verification",
+ defaultValue = "10")
+ private int threadCount;
+
+ private ExecutorService verificationExecutor;
+ private ExecutorService writerExecutor;
+ private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
+ private List<CompletableFuture<Void>> allFutures;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(SerializationFeature.INDENT_OUTPUT, true);
+ private static final ObjectWriter WRITER = OBJECT_MAPPER.writer();
@Override
protected void execute(OzoneClient client, OzoneAddress address) throws
IOException {
- replicaVerifiers = new ArrayList<>();
-
- if (verification.doExecuteChecksums) {
- replicaVerifiers.add(new ChecksumVerifier(getConf()));
- }
+ allFutures = new ArrayList<>();
+ verificationExecutor = Executors.newFixedThreadPool(threadCount);
+ writerExecutor = Executors.newSingleThreadExecutor();
Review Comment:
These executors work from an unbounded queue which could blow up memory if
verifiers are running much slower than listing. We need to create back pressure
on the listing thread in this case. For the verifier thread pool we can
probably bound the queue at a multiple of the consuming thread pool size to
create some buffer if verifiers are finishing in a bursty manner. The writer
thread should always run the fastest and could probably get away without
bounding the queue, but since we may be working with millions of objects we
should bound it to be safe.
Here I've suggested `2*threadCount` as the bound for the verifier executor
so it scales with thread count while leaving a reasonable buffer to prefetch
listings so the verifier threads always have work. For the writer I've
suggested bounding the queue at `threadCount` so that all verifier threads can
potentially flush results simultaneously with minimal blocking (the queue push
operation is still synchronized). The writer is expected to get through all of
these serially before the next round of verifiers finish since it is the only
stage that does not need to do network IO. I've suggested `LinkedBlockingQueue`
over `ArrayBlockingQueue` since the linked list implementation does not share a
lock between the push and pop operations required by the producer and consumer.
This initial suggestion is based more on intuition, and in the future we can
benchmark this and make modifications as necessary.
```suggestion
verificationExecutor = new ThreadPoolExecutor(threadCount, threadCount,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(2 * threadCount), new
ThreadPoolExecutor.CallerRunsPolicy());
writerExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(threadCount), new
ThreadPoolExecutor.CallerRunsPolicy());
```
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -67,21 +96,59 @@ public class ReplicasVerify extends Handler {
@CommandLine.ArgGroup(exclusive = false, multiplicity = "1")
private Verification verification;
- private List<ReplicaVerifier> replicaVerifiers;
+ @CommandLine.Option(names = "--threads",
+ description = "Number of threads to use for verification",
+ defaultValue = "10")
+ private int threadCount;
+
+ private ExecutorService verificationExecutor;
+ private ExecutorService writerExecutor;
+ private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
Review Comment:
Not technically required in this context but `static ThreadLocal` is usually
used to prevent confusion with multiple instances created by the same thread.
```suggestion
private static ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
```
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -67,21 +96,59 @@ public class ReplicasVerify extends Handler {
@CommandLine.ArgGroup(exclusive = false, multiplicity = "1")
private Verification verification;
- private List<ReplicaVerifier> replicaVerifiers;
+ @CommandLine.Option(names = "--threads",
+ description = "Number of threads to use for verification",
+ defaultValue = "10")
+ private int threadCount;
+
+ private ExecutorService verificationExecutor;
+ private ExecutorService writerExecutor;
+ private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
+ private List<CompletableFuture<Void>> allFutures;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(SerializationFeature.INDENT_OUTPUT, true);
+ private static final ObjectWriter WRITER = OBJECT_MAPPER.writer();
@Override
protected void execute(OzoneClient client, OzoneAddress address) throws
IOException {
- replicaVerifiers = new ArrayList<>();
-
- if (verification.doExecuteChecksums) {
- replicaVerifiers.add(new ChecksumVerifier(getConf()));
- }
+ allFutures = new ArrayList<>();
+ verificationExecutor = Executors.newFixedThreadPool(threadCount);
+ writerExecutor = Executors.newSingleThreadExecutor();
+ threadLocalVerifiers = ThreadLocal.withInitial(() -> {
+ List<ReplicaVerifier> verifiers = new ArrayList<>();
+ try {
+ if (verification.doExecuteChecksums) {
+ verifiers.add(new ChecksumVerifier(getConf()));
+ }
- if (verification.doExecuteBlockExistence) {
- replicaVerifiers.add(new BlockExistenceVerifier(getConf()));
+ if (verification.doExecuteBlockExistence) {
+ verifiers.add(new BlockExistenceVerifier(getConf()));
+ }
+ } catch (IOException e) {
+ LOG.error("Error initializing verifiers", e);
+ throw new RuntimeException("Error initializing verifiers", e);
+ }
+ return verifiers;
+ });
+
+ try {
+ createOutputDirectory();
+ findCandidateKeys(client, address);
Review Comment:
We should make `findCandidateKeys` not throw any checked exceptions and just
continue reading keys if there are errors. Right now if there's one key that
fails to read the whole thing will fail. After removing the output dir creation
as suggested above this inner try/finally can be removed.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -67,21 +96,59 @@ public class ReplicasVerify extends Handler {
@CommandLine.ArgGroup(exclusive = false, multiplicity = "1")
private Verification verification;
- private List<ReplicaVerifier> replicaVerifiers;
+ @CommandLine.Option(names = "--threads",
+ description = "Number of threads to use for verification",
+ defaultValue = "10")
+ private int threadCount;
+
+ private ExecutorService verificationExecutor;
+ private ExecutorService writerExecutor;
+ private ThreadLocal<List<ReplicaVerifier>> threadLocalVerifiers;
+ private List<CompletableFuture<Void>> allFutures;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(SerializationFeature.INDENT_OUTPUT, true);
+ private static final ObjectWriter WRITER = OBJECT_MAPPER.writer();
@Override
protected void execute(OzoneClient client, OzoneAddress address) throws
IOException {
- replicaVerifiers = new ArrayList<>();
-
- if (verification.doExecuteChecksums) {
- replicaVerifiers.add(new ChecksumVerifier(getConf()));
- }
+ allFutures = new ArrayList<>();
+ verificationExecutor = Executors.newFixedThreadPool(threadCount);
+ writerExecutor = Executors.newSingleThreadExecutor();
+ threadLocalVerifiers = ThreadLocal.withInitial(() -> {
+ List<ReplicaVerifier> verifiers = new ArrayList<>();
+ try {
+ if (verification.doExecuteChecksums) {
+ verifiers.add(new ChecksumVerifier(getConf()));
+ }
- if (verification.doExecuteBlockExistence) {
- replicaVerifiers.add(new BlockExistenceVerifier(getConf()));
+ if (verification.doExecuteBlockExistence) {
+ verifiers.add(new BlockExistenceVerifier(getConf()));
+ }
+ } catch (IOException e) {
+ LOG.error("Error initializing verifiers", e);
+ throw new RuntimeException("Error initializing verifiers", e);
+ }
+ return verifiers;
+ });
+
+ try {
+ createOutputDirectory();
+ findCandidateKeys(client, address);
+ } finally {
+ verificationExecutor.shutdown();
+ writerExecutor.shutdown();
+
+ try {
+ // Wait for all tasks to complete
+ verificationExecutor.awaitTermination(1, TimeUnit.DAYS);
+ writerExecutor.awaitTermination(1, TimeUnit.DAYS);
+ threadLocalVerifiers.remove();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Interrupted while waiting for verification to complete", e);
Review Comment:
This should throw a `RuntimeException` to terminate the process and have
PicoCLI display the error message and exit non-zero. Log can be useful if we
want to capture the stack trace for debugging (like failing to init the
validators) but in this case I don't think it helps much.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/KeyVerificationResult.java:
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.debug.replicas;
+
+import java.util.List;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+/**
+ * Class to hold the verification results for a key.
+ */
+public class KeyVerificationResult {
+ private final boolean keyPass;
Review Comment:
I don't think this is in the json output currently but we probably want a
`keyCompleted` field as well which would map to the AND of all block verifier's
`completed` statuses. This is used to indicate that a check failed without
actually being able to run the check (like datanode not reachable). We would
also set this to false if the `getKeyInfo` call fails.
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -95,132 +162,178 @@ void findCandidateKeys(OzoneClient ozoneClient,
OzoneAddress address) throws IOE
String bucketName = address.getBucketName();
String keyName = address.getKeyName();
- ObjectNode root = JsonUtils.createObjectNode(null);
- ArrayNode keysArray = root.putArray("keys");
-
AtomicBoolean allKeysPassed = new AtomicBoolean(true);
+ File outputFile = new File(outputDir, "replicas-verify-result.json");
+
+ try (OutputStream outputStream =
Files.newOutputStream(outputFile.toPath());
+ JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(outputStream, JsonEncoding.UTF8)) {
+ // open json
+ jsonGenerator.useDefaultPrettyPrinter();
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeFieldName("keys");
+ jsonGenerator.writeStartArray();
+ jsonGenerator.flush();
+
+ try (SequenceWriter sequenceWriter = createSequenceWriter(false,
jsonGenerator)) {
+ // Process keys based on the provided address
+ if (!keyName.isEmpty()) {
+ processKey(ozoneClient, volumeName, bucketName, keyName,
sequenceWriter, allKeysPassed);
+ } else if (!bucketName.isEmpty()) {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ checkBucket(ozoneClient, bucket, sequenceWriter, allKeysPassed);
+ } else if (!volumeName.isEmpty()) {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ checkVolume(ozoneClient, volume, sequenceWriter, allKeysPassed);
+ } else {
+ for (Iterator<? extends OzoneVolume> it =
objectStore.listVolumes(null); it.hasNext();) {
+ checkVolume(ozoneClient, it.next(), sequenceWriter, allKeysPassed);
+ }
+ }
+
+ // Wait for all futures to complete
+ CompletableFuture<Void> allOf =
CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0]));
+ try {
+ allOf.join();
+ } catch (Exception e) {
+ LOG.error("Error during verification", e);
+ }
+ }
- if (!keyName.isEmpty()) {
- processKey(ozoneClient, volumeName, bucketName, keyName, keysArray,
allKeysPassed);
- } else if (!bucketName.isEmpty()) {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- OzoneBucket bucket = volume.getBucket(bucketName);
- checkBucket(ozoneClient, bucket, keysArray, allKeysPassed);
- } else if (!volumeName.isEmpty()) {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- checkVolume(ozoneClient, volume, keysArray, allKeysPassed);
- } else {
- for (Iterator<? extends OzoneVolume> it = objectStore.listVolumes(null);
it.hasNext();) {
- checkVolume(ozoneClient, it.next(), keysArray, allKeysPassed);
+ // close json
+ try {
+ jsonGenerator.writeEndArray();
+ jsonGenerator.writeBooleanField("pass", allKeysPassed.get());
+ jsonGenerator.writeEndObject();
+ } catch (Exception e) {
+ LOG.error("Exception in closing the JSON structure", e);
}
}
- root.put("pass", allKeysPassed.get());
- System.out.println(JsonUtils.toJsonStringWithDefaultPrettyPrinter(root));
}
- void checkVolume(OzoneClient ozoneClient, OzoneVolume volume, ArrayNode
keysArray, AtomicBoolean allKeysPassed)
- throws IOException {
+ void checkVolume(OzoneClient ozoneClient, OzoneVolume volume,
+ SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws
IOException {
for (Iterator<? extends OzoneBucket> it = volume.listBuckets(null);
it.hasNext();) {
OzoneBucket bucket = it.next();
- checkBucket(ozoneClient, bucket, keysArray, allKeysPassed);
+ checkBucket(ozoneClient, bucket, sequenceWriter, allKeysPassed);
}
}
- void checkBucket(OzoneClient ozoneClient, OzoneBucket bucket, ArrayNode
keysArray, AtomicBoolean allKeysPassed)
- throws IOException {
+ void checkBucket(OzoneClient ozoneClient, OzoneBucket bucket,
+ SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws
IOException {
for (Iterator<? extends OzoneKey> it = bucket.listKeys(null);
it.hasNext();) {
OzoneKey key = it.next();
// TODO: Remove this check once HDDS-12094 is fixed
if (!key.getName().endsWith("/")) {
- processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
key.getName(), keysArray, allKeysPassed);
+ processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
+ key.getName(), sequenceWriter, allKeysPassed);
}
}
}
- void processKey(OzoneClient ozoneClient, String volumeName, String
bucketName, String keyName,
- ArrayNode keysArray, AtomicBoolean allKeysPassed) throws IOException {
- OmKeyInfo keyInfo = ozoneClient.getProxy().getKeyInfo(
- volumeName, bucketName, keyName, false);
-
- ObjectNode keyNode = JsonUtils.createObjectNode(null);
- keyNode.put("volumeName", volumeName);
- keyNode.put("bucketName", bucketName);
- keyNode.put("name", keyName);
-
- ArrayNode blocksArray = keyNode.putArray("blocks");
- boolean keyPass = true;
-
- for (OmKeyLocationInfo keyLocation :
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) {
- long containerID = keyLocation.getContainerID();
- long localID = keyLocation.getLocalID();
-
- ObjectNode blockNode = blocksArray.addObject();
- blockNode.put("containerID", containerID);
- blockNode.put("blockID", localID);
-
- ArrayNode replicasArray = blockNode.putArray("replicas");
- boolean blockPass = true;
-
- for (DatanodeDetails datanode : keyLocation.getPipeline().getNodes()) {
- ObjectNode replicaNode = replicasArray.addObject();
-
- ObjectNode datanodeNode = replicaNode.putObject("datanode");
- datanodeNode.put("uuid", datanode.getUuidString());
- datanodeNode.put("hostname", datanode.getHostName());
-
- ArrayNode checksArray = replicaNode.putArray("checks");
- boolean replicaPass = true;
- int replicaIndex = keyLocation.getPipeline().getReplicaIndex(datanode);
+ private void processKey(OzoneClient ozoneClient, String volumeName, String
bucketName,
+ String keyName, SequenceWriter sequenceWriter, AtomicBoolean
allKeysPassed) {
+ CompletableFuture<Void> future =
+ CompletableFuture.supplyAsync(() ->
+ verifyKey(ozoneClient, volumeName, bucketName, keyName),
verificationExecutor)
+ .handleAsync((keyResult, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Error verifying key: {}/{}/{}", volumeName,
bucketName, keyName, throwable);
+ return new KeyVerificationResult(volumeName, bucketName,
keyName, new ArrayList<>(), false);
+ }
+ return keyResult;
+ }, verificationExecutor)
+ .thenAcceptAsync(keyResult ->
+ writeVerificationResult(sequenceWriter, allKeysPassed,
keyResult), writerExecutor);
+ allFutures.add(future);
+ }
- for (ReplicaVerifier verifier : replicaVerifiers) {
- BlockVerificationResult result = verifier.verifyBlock(datanode,
keyLocation, replicaIndex);
- ObjectNode checkNode = checksArray.addObject();
- checkNode.put("type", verifier.getType());
- checkNode.put("completed", result.isCompleted());
- checkNode.put("pass", result.passed());
+ private void writeVerificationResult(SequenceWriter sequenceWriter,
+ AtomicBoolean allKeysPassed, KeyVerificationResult keyResult) {
+ try {
+ allKeysPassed.compareAndSet(true, keyResult.isKeyPass());
+ if (!keyResult.isKeyPass() || allResults) {
+ ObjectNode keyNode = OBJECT_MAPPER.convertValue(keyResult,
ObjectNode.class);
+ sequenceWriter.write(keyNode);
+ }
+ } catch (IOException e) {
+ LOG.error("Error writing verification result", e);
+ throw new CompletionException(e);
+ }
+ }
- ArrayNode failuresArray = checkNode.putArray("failures");
- for (String failure : result.getFailures()) {
- failuresArray.addObject().put("message", failure);
+ private KeyVerificationResult verifyKey(OzoneClient ozoneClient, String
volumeName,
+ String bucketName, String keyName) {
+ try {
+ boolean keyPass = true;
+ OmKeyInfo keyInfo =
+ ozoneClient.getProxy().getKeyInfo(volumeName, bucketName, keyName,
false);
+
+ List<KeyVerificationResult.BlockVerificationData> blockResults = new
ArrayList<>();
Review Comment:
I don't think we need all these inner classes. `KeyVerificationResult` can
be constructed from just a volume, bucket, and key. It can have an `add` method
that takes a the check type, `BlockVerificationResult`, and `DatanodeDetails`
and maintain this list internally. The key's `pass` boolean can be updated on
the fly with each block addition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]