ptlrs commented on code in PR #8363:
URL: https://github.com/apache/ozone/pull/8363#discussion_r2096782792
##########
hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java:
##########
@@ -95,132 +162,178 @@ void findCandidateKeys(OzoneClient ozoneClient,
OzoneAddress address) throws IOE
String bucketName = address.getBucketName();
String keyName = address.getKeyName();
- ObjectNode root = JsonUtils.createObjectNode(null);
- ArrayNode keysArray = root.putArray("keys");
-
AtomicBoolean allKeysPassed = new AtomicBoolean(true);
+ File outputFile = new File(outputDir, "replicas-verify-result.json");
+
+ try (OutputStream outputStream =
Files.newOutputStream(outputFile.toPath());
+ JsonGenerator jsonGenerator =
JSON_FACTORY.createGenerator(outputStream, JsonEncoding.UTF8)) {
+ // open json
+ jsonGenerator.useDefaultPrettyPrinter();
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeFieldName("keys");
+ jsonGenerator.writeStartArray();
+ jsonGenerator.flush();
+
+ try (SequenceWriter sequenceWriter = createSequenceWriter(false,
jsonGenerator)) {
+ // Process keys based on the provided address
+ if (!keyName.isEmpty()) {
+ processKey(ozoneClient, volumeName, bucketName, keyName,
sequenceWriter, allKeysPassed);
+ } else if (!bucketName.isEmpty()) {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ checkBucket(ozoneClient, bucket, sequenceWriter, allKeysPassed);
+ } else if (!volumeName.isEmpty()) {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ checkVolume(ozoneClient, volume, sequenceWriter, allKeysPassed);
+ } else {
+ for (Iterator<? extends OzoneVolume> it =
objectStore.listVolumes(null); it.hasNext();) {
+ checkVolume(ozoneClient, it.next(), sequenceWriter, allKeysPassed);
+ }
+ }
+
+ // Wait for all futures to complete
+ CompletableFuture<Void> allOf =
CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[0]));
+ try {
+ allOf.join();
+ } catch (Exception e) {
+ LOG.error("Error during verification", e);
+ }
+ }
- if (!keyName.isEmpty()) {
- processKey(ozoneClient, volumeName, bucketName, keyName, keysArray,
allKeysPassed);
- } else if (!bucketName.isEmpty()) {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- OzoneBucket bucket = volume.getBucket(bucketName);
- checkBucket(ozoneClient, bucket, keysArray, allKeysPassed);
- } else if (!volumeName.isEmpty()) {
- OzoneVolume volume = objectStore.getVolume(volumeName);
- checkVolume(ozoneClient, volume, keysArray, allKeysPassed);
- } else {
- for (Iterator<? extends OzoneVolume> it = objectStore.listVolumes(null);
it.hasNext();) {
- checkVolume(ozoneClient, it.next(), keysArray, allKeysPassed);
+ // close json
+ try {
+ jsonGenerator.writeEndArray();
+ jsonGenerator.writeBooleanField("pass", allKeysPassed.get());
+ jsonGenerator.writeEndObject();
+ } catch (Exception e) {
+ LOG.error("Exception in closing the JSON structure", e);
}
}
- root.put("pass", allKeysPassed.get());
- System.out.println(JsonUtils.toJsonStringWithDefaultPrettyPrinter(root));
}
- void checkVolume(OzoneClient ozoneClient, OzoneVolume volume, ArrayNode
keysArray, AtomicBoolean allKeysPassed)
- throws IOException {
+ void checkVolume(OzoneClient ozoneClient, OzoneVolume volume,
+ SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws
IOException {
for (Iterator<? extends OzoneBucket> it = volume.listBuckets(null);
it.hasNext();) {
OzoneBucket bucket = it.next();
- checkBucket(ozoneClient, bucket, keysArray, allKeysPassed);
+ checkBucket(ozoneClient, bucket, sequenceWriter, allKeysPassed);
}
}
- void checkBucket(OzoneClient ozoneClient, OzoneBucket bucket, ArrayNode
keysArray, AtomicBoolean allKeysPassed)
- throws IOException {
+ void checkBucket(OzoneClient ozoneClient, OzoneBucket bucket,
+ SequenceWriter sequenceWriter, AtomicBoolean allKeysPassed) throws
IOException {
for (Iterator<? extends OzoneKey> it = bucket.listKeys(null);
it.hasNext();) {
OzoneKey key = it.next();
// TODO: Remove this check once HDDS-12094 is fixed
if (!key.getName().endsWith("/")) {
- processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
key.getName(), keysArray, allKeysPassed);
+ processKey(ozoneClient, key.getVolumeName(), key.getBucketName(),
+ key.getName(), sequenceWriter, allKeysPassed);
}
}
}
- void processKey(OzoneClient ozoneClient, String volumeName, String
bucketName, String keyName,
- ArrayNode keysArray, AtomicBoolean allKeysPassed) throws IOException {
- OmKeyInfo keyInfo = ozoneClient.getProxy().getKeyInfo(
- volumeName, bucketName, keyName, false);
-
- ObjectNode keyNode = JsonUtils.createObjectNode(null);
- keyNode.put("volumeName", volumeName);
- keyNode.put("bucketName", bucketName);
- keyNode.put("name", keyName);
-
- ArrayNode blocksArray = keyNode.putArray("blocks");
- boolean keyPass = true;
-
- for (OmKeyLocationInfo keyLocation :
keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly()) {
- long containerID = keyLocation.getContainerID();
- long localID = keyLocation.getLocalID();
-
- ObjectNode blockNode = blocksArray.addObject();
- blockNode.put("containerID", containerID);
- blockNode.put("blockID", localID);
-
- ArrayNode replicasArray = blockNode.putArray("replicas");
- boolean blockPass = true;
-
- for (DatanodeDetails datanode : keyLocation.getPipeline().getNodes()) {
- ObjectNode replicaNode = replicasArray.addObject();
-
- ObjectNode datanodeNode = replicaNode.putObject("datanode");
- datanodeNode.put("uuid", datanode.getUuidString());
- datanodeNode.put("hostname", datanode.getHostName());
-
- ArrayNode checksArray = replicaNode.putArray("checks");
- boolean replicaPass = true;
- int replicaIndex = keyLocation.getPipeline().getReplicaIndex(datanode);
+ private void processKey(OzoneClient ozoneClient, String volumeName, String
bucketName,
+ String keyName, SequenceWriter sequenceWriter, AtomicBoolean
allKeysPassed) {
+ CompletableFuture<Void> future =
+ CompletableFuture.supplyAsync(() ->
+ verifyKey(ozoneClient, volumeName, bucketName, keyName),
verificationExecutor)
+ .handleAsync((keyResult, throwable) -> {
+ if (throwable != null) {
+ LOG.error("Error verifying key: {}/{}/{}", volumeName,
bucketName, keyName, throwable);
+ return new KeyVerificationResult(volumeName, bucketName,
keyName, new ArrayList<>(), false);
+ }
+ return keyResult;
+ }, verificationExecutor)
+ .thenAcceptAsync(keyResult ->
+ writeVerificationResult(sequenceWriter, allKeysPassed,
keyResult), writerExecutor);
+ allFutures.add(future);
+ }
- for (ReplicaVerifier verifier : replicaVerifiers) {
- BlockVerificationResult result = verifier.verifyBlock(datanode,
keyLocation, replicaIndex);
- ObjectNode checkNode = checksArray.addObject();
- checkNode.put("type", verifier.getType());
- checkNode.put("completed", result.isCompleted());
- checkNode.put("pass", result.passed());
+ private void writeVerificationResult(SequenceWriter sequenceWriter,
+ AtomicBoolean allKeysPassed, KeyVerificationResult keyResult) {
+ try {
+ allKeysPassed.compareAndSet(true, keyResult.isKeyPass());
+ if (!keyResult.isKeyPass() || allResults) {
+ ObjectNode keyNode = OBJECT_MAPPER.convertValue(keyResult,
ObjectNode.class);
+ sequenceWriter.write(keyNode);
+ }
+ } catch (IOException e) {
+ LOG.error("Error writing verification result", e);
+ throw new CompletionException(e);
+ }
+ }
- ArrayNode failuresArray = checkNode.putArray("failures");
- for (String failure : result.getFailures()) {
- failuresArray.addObject().put("message", failure);
+ private KeyVerificationResult verifyKey(OzoneClient ozoneClient, String
volumeName,
+ String bucketName, String keyName) {
+ try {
+ boolean keyPass = true;
+ OmKeyInfo keyInfo =
+ ozoneClient.getProxy().getKeyInfo(volumeName, bucketName, keyName,
false);
+
+ List<KeyVerificationResult.BlockVerificationData> blockResults = new
ArrayList<>();
Review Comment:
Not sure I follow this. The inner classes are there to generate the expected
JSON pattern by mapping the structure of the class via:
https://github.com/apache/ozone/blob/adeb472dd59aa79567fda14b59f3082c80a0653b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/replicas/ReplicasVerify.java#L256
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]