Jason918 commented on code in PR #21988:
URL: https://github.com/apache/pulsar/pull/21988#discussion_r1479186394


##########
pip/PIP-334.md:
##########
@@ -0,0 +1,201 @@
+
+
+# PIP-334: Create a new admin API: pck (Pulsar Consistency Checker).
+
+# Background knowledge
+We need an administrator interface to detect whether the topic metadata is 
consistent with the zookeeper.
+
+# Motivation
+Orphan ledgers can be detected through the PCK API when the bookkeeper's disk 
is abnormally high.
+If necessary, we can be deleted directly to free up disk space.
+
+
+# Goals
+
+## In Scope
+
+Create a new admin API:
+
+- POST /admin/v2/pck/detect-orphan-ledger
+  - Get ledgers from the topic's metadata and zookeeper, then compare them, 
find orphaned ledgers that are not used in the topic, and remove them if 
necessary.
+
+  
+
+
+
+# Detailed Design
+
+## Design & Implementation Details
+
+
+```java
+@Path("/pck")
+@Produces(MediaType.APPLICATION_JSON)
+@Api(value = "/pck", description = "pck apis", tags = "consistency checker")
+public class ConsistencyChecker extends AdminResource {
+
+    @POST
+    @Path("/detect-orphan-ledger")
+    @ApiOperation(value = "detect orphan ledger.", response = 
OrphanedLedgerInfo.class, responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void detectOrphanLedger(@Suspended AsyncResponse asyncResponse,
+            @ApiParam(value = "Delete the orphan ledgers.")
+            @QueryParam("delete") @DefaultValue("false") boolean delete,
+            @ApiParam(value = "Delete the orphan ledgers.")

Review Comment:
   duplicate line? 



##########
pip/PIP-334.md:
##########
@@ -0,0 +1,201 @@
+
+
+# PIP-334: Create a new admin API: pck (Pulsar Consistency Checker).
+
+# Background knowledge
+We need an administrator interface to detect whether the topic metadata is 
consistent with the zookeeper.
+
+# Motivation
+Orphan ledgers can be detected through the PCK API when the bookkeeper's disk 
is abnormally high.
+If necessary, we can be deleted directly to free up disk space.
+
+
+# Goals
+
+## In Scope
+
+Create a new admin API:
+
+- POST /admin/v2/pck/detect-orphan-ledger
+  - Get ledgers from the topic's metadata and zookeeper, then compare them, 
find orphaned ledgers that are not used in the topic, and remove them if 
necessary.
+
+  
+
+
+
+# Detailed Design
+
+## Design & Implementation Details
+
+
+```java
+@Path("/pck")
+@Produces(MediaType.APPLICATION_JSON)
+@Api(value = "/pck", description = "pck apis", tags = "consistency checker")
+public class ConsistencyChecker extends AdminResource {
+
+    @POST
+    @Path("/detect-orphan-ledger")
+    @ApiOperation(value = "detect orphan ledger.", response = 
OrphanedLedgerInfo.class, responseContainer = "List")
+    @ApiResponses(value = {
+            @ApiResponse(code = 500, message = "Internal server error")})
+    public void detectOrphanLedger(@Suspended AsyncResponse asyncResponse,
+            @ApiParam(value = "Delete the orphan ledgers.")
+            @QueryParam("delete") @DefaultValue("false") boolean delete,
+            @ApiParam(value = "Delete the orphan ledgers.")
+            @ApiParam(value = "The minimum stale time (in days) for topic 
ledgers.", defaultValue = "7")
+            @PathParam("staleTime") int staleTime) {
+    }
+
+
+  private void internalDetectOrphanLedger(AsyncResponse asyncResponse, boolean 
delete, int staleTime) {
+    ManagedLedgerFactoryImpl managedLedgerFactory = (ManagedLedgerFactoryImpl) 
pulsar().getManagedLedgerFactory();
+    BookKeeperAdmin bookKeeperAdmin = new 
BookKeeperAdmin(managedLedgerFactory.getBookKeeper());
+    Set<Long> nonOrphanedLedgers = new ConcurrentHashSet<>();
+    int semaphorePermits = 1;
+    Semaphore semaphore = new Semaphore(semaphorePermits);
+    Set<BookieInfo> maybeOrphanedLedgers = new ConcurrentHashSet<>();
+    Set<OrphanedLedgerInfo> orphanedLedgers = new ConcurrentHashSet<>();
+    try {
+      bookKeeperAdmin.listLedgers().forEach(ledgerId -> {
+        if (!nonOrphanedLedgers.contains(ledgerId)) {
+          try {
+            semaphore.acquire();
+            openLedger(ledgerId).thenAccept((infosx) -> {
+              if (!infosx.isPulsarLedger()) {
+                log.warn("Ledger:{} is not a pulsar ledger.", ledgerId);
+                semaphore.release();
+                return;
+              }
+
+              TopicName topic = infosx.getLedgerTopic();
+              if (topic == null) {
+                maybeOrphanedLedgers.add(infosx);
+                semaphore.release();
+              } else {
+                infosx.stillExistsInMetaStore().thenAccept((exists) -> {
+                  if (!exists) {
+                    processOrphanedLedger(infosx, staleTime).whenComplete((r, 
ex) -> {
+                      r.ifPresent(orphanedLedgers::add);
+                      semaphore.release();
+                    });
+                  } else {
+                    
getTopicUsedLedgersAsync(topic.toString()).thenAccept((ledgers) -> {
+                      ledgers.forEach((lid) -> {
+                        nonOrphanedLedgers.add(lid);
+                        maybeOrphanedLedgers.remove(lid);
+                      });
+                      if (!ledgers.contains(ledgerId)) {
+                        processOrphanedLedger(infosx, 
staleTime).whenComplete((r, ex) -> {
+                          r.ifPresent(orphanedLedgers::add);
+                          semaphore.release();
+                        });
+                      } else {
+                        semaphore.release();
+                      }
+
+                    }).exceptionally((ex) -> {
+                      if (ex.getCause() instanceof 
PulsarAdminException.NotFoundException) {
+                        if (infosx.isSchemaLedger()) {
+                          maybeOrphanedLedgers.add(infosx);
+                          semaphore.release();
+                        } else {
+                          processOrphanedLedger(infosx, 
staleTime).whenComplete((r, ex1) -> {
+                            r.ifPresent(orphanedLedgers::add);
+                            semaphore.release();
+                          });
+                        }
+                      } else {
+                        log.error("Failed to get topic: {} ledger list: {}", 
topic, ex);
+                        semaphore.release();
+                      }
+
+                      return null;
+                    });
+                  }
+
+                }).exceptionally((ex) -> {
+                  log.error("Failed to get topic: {} ledger list: {}", topic, 
ex);
+                  semaphore.release();
+                  return null;
+                });
+              }
+
+
+            });
+          } catch (InterruptedException e) {
+            asyncResponse.resume(new RestException(e));
+          }
+        }
+      });
+
+      while (semaphore.availablePermits() != semaphorePermits) {

Review Comment:
   Is it better to use CountDownLatch here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to