horizonzy commented on code in PR #3637:
URL: https://github.com/apache/bookkeeper/pull/3637#discussion_r1102236736
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java:
##########
@@ -343,167 +154,43 @@ public Auditor(final String bookieIdentifier,
StatsLogger statsLogger)
throws UnavailableException {
this.conf = conf;
- this.underreplicatedLedgerRecoveryGracePeriod =
conf.getUnderreplicatedLedgerRecoveryGracePeriod();
- this.zkOpTimeoutMs = conf.getZkTimeout() * 2;
this.bookieIdentifier = bookieIdentifier;
- this.statsLogger = statsLogger;
- this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck = new
AtomicInteger(0);
- this.ledgersNotAdheringToPlacementPolicyGuageValue = new
AtomicInteger(0);
- this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck = new
AtomicInteger(0);
- this.ledgersSoftlyAdheringToPlacementPolicyGuageValue = new
AtomicInteger(0);
- this.numOfClosedLedgersAuditedInPlacementPolicyCheck = new
AtomicInteger(0);
- this.numOfURLedgersElapsedRecoveryGracePeriod = new AtomicInteger(0);
- this.numOfURLedgersElapsedRecoveryGracePeriodGuageValue = new
AtomicInteger(0);
- this.numLedgersHavingNoReplicaOfAnEntryGuageValue = new
AtomicInteger(0);
- this.numLedgersFoundHavingNoReplicaOfAnEntry = new AtomicInteger(0);
- this.numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue = new
AtomicInteger(0);
- this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry = new
AtomicInteger(0);
- this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue = new
AtomicInteger(0);
- this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new
AtomicInteger(0);
-
- if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) {
- LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should
be greater than 0");
- throw new
UnavailableException("auditorMaxNumberOfConcurrentOpenLedgerOperations should
be greater than 0");
- }
- this.openLedgerNoRecoverySemaphore = new
Semaphore(conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations());
-
- if (conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec()
< 0) {
- LOG.error("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec
should be greater than or equal to 0");
- throw new
UnavailableException("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec "
- + "should be greater than or equal to 0");
- }
- this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec =
-
conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();
-
- this.underReplicatedLedgersGuageValue = new AtomicInteger(0);
- numUnderReplicatedLedger =
this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
- underReplicatedLedgerTotalSize =
this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
- uRLPublishTimeForLostBookies = this.statsLogger
-
.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
- bookieToLedgersMapCreationTime = this.statsLogger
-
.getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
- checkAllLedgersTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
- placementPolicyCheckTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
- replicasCheckTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
- auditBookiesTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.AUDIT_BOOKIES_TIME);
- numLedgersChecked =
this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
- numFragmentsPerLedger =
statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
- numBookiesPerLedger =
statsLogger.getOpStatsLogger(ReplicationStats.NUM_BOOKIES_PER_LEDGER);
- numBookieAuditsDelayed =
this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
- numDelayedBookieAuditsCancelled = this.statsLogger
-
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
- numReplicatedLedgers =
this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS);
- numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return ledgersNotAdheringToPlacementPolicyGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
- numLedgersNotAdheringToPlacementPolicy);
- numLedgersSoftlyAdheringToPlacementPolicy = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return ledgersSoftlyAdheringToPlacementPolicyGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY,
- numLedgersSoftlyAdheringToPlacementPolicy);
-
- numUnderreplicatedLedgersElapsedRecoveryGracePeriod = new
Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return
numOfURLedgersElapsedRecoveryGracePeriodGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD,
- numUnderreplicatedLedgersElapsedRecoveryGracePeriod);
-
- numLedgersHavingNoReplicaOfAnEntry = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return numLedgersHavingNoReplicaOfAnEntryGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY,
- numLedgersHavingNoReplicaOfAnEntry);
- numLedgersHavingLessThanAQReplicasOfAnEntry = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return
numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY,
- numLedgersHavingLessThanAQReplicasOfAnEntry);
- numLedgersHavingLessThanWQReplicasOfAnEntry = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return
numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
- numLedgersHavingLessThanWQReplicasOfAnEntry);
- numUnderReplicatedLedgers = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return underReplicatedLedgersGuageValue.get();
- }
- };
- this.statsLogger.registerGauge(NUM_UNDER_REPLICATED_LEDGERS_GUAGE,
numUnderReplicatedLedgers);
+ this.auditorStats = new AuditorStats(statsLogger);
this.bkc = bkc;
this.ownBkc = ownBkc;
this.admin = admin;
this.ownAdmin = ownAdmin;
initialize(conf, bkc);
- executor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactory() {
+ ledgerCheckerExecutor = Executors.newSingleThreadExecutor(new
ThreadFactory() {
Review Comment:
We can define and initialize it in AuditorCheckAllLedgersTask. And add an
abstract method `shutdown()` in the AuditorTask, if we need to do some resource
cleaning, we can override the shutdown(). In Auditor shutdown(), to invoke
task.shutdown();
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]