horizonzy commented on code in PR #3637:
URL: https://github.com/apache/bookkeeper/pull/3637#discussion_r1102236736


##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java:
##########
@@ -343,167 +154,43 @@ public Auditor(final String bookieIdentifier,
                    StatsLogger statsLogger)
             throws UnavailableException {
         this.conf = conf;
-        this.underreplicatedLedgerRecoveryGracePeriod = 
conf.getUnderreplicatedLedgerRecoveryGracePeriod();
-        this.zkOpTimeoutMs = conf.getZkTimeout() * 2;
         this.bookieIdentifier = bookieIdentifier;
-        this.statsLogger = statsLogger;
-        this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck = new 
AtomicInteger(0);
-        this.ledgersNotAdheringToPlacementPolicyGuageValue = new 
AtomicInteger(0);
-        this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck = new 
AtomicInteger(0);
-        this.ledgersSoftlyAdheringToPlacementPolicyGuageValue = new 
AtomicInteger(0);
-        this.numOfClosedLedgersAuditedInPlacementPolicyCheck = new 
AtomicInteger(0);
-        this.numOfURLedgersElapsedRecoveryGracePeriod = new AtomicInteger(0);
-        this.numOfURLedgersElapsedRecoveryGracePeriodGuageValue = new 
AtomicInteger(0);
-        this.numLedgersHavingNoReplicaOfAnEntryGuageValue = new 
AtomicInteger(0);
-        this.numLedgersFoundHavingNoReplicaOfAnEntry = new AtomicInteger(0);
-        this.numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue = new 
AtomicInteger(0);
-        this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry = new 
AtomicInteger(0);
-        this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue = new 
AtomicInteger(0);
-        this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new 
AtomicInteger(0);
-
-        if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) {
-            LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should 
be greater than 0");
-            throw new 
UnavailableException("auditorMaxNumberOfConcurrentOpenLedgerOperations should 
be greater than 0");
-        }
-        this.openLedgerNoRecoverySemaphore = new 
Semaphore(conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations());
-
-        if (conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec() 
< 0) {
-            LOG.error("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec 
should be greater than or equal to 0");
-            throw new 
UnavailableException("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec "
-                    + "should be greater than or equal to 0");
-        }
-        this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec =
-                
conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();
-
-        this.underReplicatedLedgersGuageValue = new AtomicInteger(0);
-        numUnderReplicatedLedger = 
this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
-        underReplicatedLedgerTotalSize = 
this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
-        uRLPublishTimeForLostBookies = this.statsLogger
-                
.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
-        bookieToLedgersMapCreationTime = this.statsLogger
-                
.getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
-        checkAllLedgersTime = 
this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
-        placementPolicyCheckTime = 
this.statsLogger.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
-        replicasCheckTime = 
this.statsLogger.getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
-        auditBookiesTime = 
this.statsLogger.getOpStatsLogger(ReplicationStats.AUDIT_BOOKIES_TIME);
-        numLedgersChecked = 
this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
-        numFragmentsPerLedger = 
statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
-        numBookiesPerLedger = 
statsLogger.getOpStatsLogger(ReplicationStats.NUM_BOOKIES_PER_LEDGER);
-        numBookieAuditsDelayed = 
this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
-        numDelayedBookieAuditsCancelled = this.statsLogger
-                
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
-        numReplicatedLedgers = 
this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS);
-        numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
-            @Override
-            public Integer getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Integer getSample() {
-                return ledgersNotAdheringToPlacementPolicyGuageValue.get();
-            }
-        };
-        
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
-                numLedgersNotAdheringToPlacementPolicy);
-        numLedgersSoftlyAdheringToPlacementPolicy = new Gauge<Integer>() {
-            @Override
-            public Integer getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Integer getSample() {
-                return ledgersSoftlyAdheringToPlacementPolicyGuageValue.get();
-            }
-        };
-        
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY,
-                numLedgersSoftlyAdheringToPlacementPolicy);
-
-        numUnderreplicatedLedgersElapsedRecoveryGracePeriod = new 
Gauge<Integer>() {
-            @Override
-            public Integer getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Integer getSample() {
-                return 
numOfURLedgersElapsedRecoveryGracePeriodGuageValue.get();
-            }
-        };
-        
this.statsLogger.registerGauge(ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD,
-                numUnderreplicatedLedgersElapsedRecoveryGracePeriod);
-
-        numLedgersHavingNoReplicaOfAnEntry = new Gauge<Integer>() {
-            @Override
-            public Integer getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Integer getSample() {
-                return numLedgersHavingNoReplicaOfAnEntryGuageValue.get();
-            }
-        };
-        
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY,
-                numLedgersHavingNoReplicaOfAnEntry);
-        numLedgersHavingLessThanAQReplicasOfAnEntry = new Gauge<Integer>() {
-            @Override
-            public Integer getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Integer getSample() {
-                return 
numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue.get();
-            }
-        };
-        
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY,
-                numLedgersHavingLessThanAQReplicasOfAnEntry);
-        numLedgersHavingLessThanWQReplicasOfAnEntry = new Gauge<Integer>() {
-            @Override
-            public Integer getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Integer getSample() {
-                return 
numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue.get();
-            }
-        };
-        
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
-                numLedgersHavingLessThanWQReplicasOfAnEntry);
-        numUnderReplicatedLedgers = new Gauge<Integer>() {
-            @Override
-            public Integer getDefaultValue() {
-                return 0;
-            }
-
-            @Override
-            public Integer getSample() {
-                return underReplicatedLedgersGuageValue.get();
-            }
-        };
-        this.statsLogger.registerGauge(NUM_UNDER_REPLICATED_LEDGERS_GUAGE, 
numUnderReplicatedLedgers);
+        this.auditorStats = new AuditorStats(statsLogger);
 
         this.bkc = bkc;
         this.ownBkc = ownBkc;
         this.admin = admin;
         this.ownAdmin = ownAdmin;
         initialize(conf, bkc);
 
-        executor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactory() {
+        ledgerCheckerExecutor = Executors.newSingleThreadExecutor(new 
ThreadFactory() {

Review Comment:
   We can define and initialize it in AuditorCheckAllLedgersTask. And add an 
abstract method `shutdown()` in the AuditorTask, if we need to do some resource 
cleaning, we can override the shutdown(). In Auditor shutdown(), to invoke 
task.shutdown();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to