horizonzy commented on code in PR #3637:
URL: https://github.com/apache/bookkeeper/pull/3637#discussion_r1098298475
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java:
##########
@@ -341,155 +153,48 @@ public Auditor(final String bookieIdentifier,
StatsLogger statsLogger)
throws UnavailableException {
this.conf = conf;
- this.underreplicatedLedgerRecoveryGracePeriod =
conf.getUnderreplicatedLedgerRecoveryGracePeriod();
- this.zkOpTimeoutMs = conf.getZkTimeout() * 2;
this.bookieIdentifier = bookieIdentifier;
- this.statsLogger = statsLogger;
- this.numOfLedgersFoundNotAdheringInPlacementPolicyCheck = new
AtomicInteger(0);
- this.ledgersNotAdheringToPlacementPolicyGuageValue = new
AtomicInteger(0);
- this.numOfLedgersFoundSoftlyAdheringInPlacementPolicyCheck = new
AtomicInteger(0);
- this.ledgersSoftlyAdheringToPlacementPolicyGuageValue = new
AtomicInteger(0);
- this.numOfClosedLedgersAuditedInPlacementPolicyCheck = new
AtomicInteger(0);
- this.numOfURLedgersElapsedRecoveryGracePeriod = new AtomicInteger(0);
- this.numOfURLedgersElapsedRecoveryGracePeriodGuageValue = new
AtomicInteger(0);
- this.numLedgersHavingNoReplicaOfAnEntryGuageValue = new
AtomicInteger(0);
- this.numLedgersFoundHavingNoReplicaOfAnEntry = new AtomicInteger(0);
- this.numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue = new
AtomicInteger(0);
- this.numLedgersFoundHavingLessThanAQReplicasOfAnEntry = new
AtomicInteger(0);
- this.numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue = new
AtomicInteger(0);
- this.numLedgersFoundHavingLessThanWQReplicasOfAnEntry = new
AtomicInteger(0);
+ this.auditorStats = new AuditorStats(statsLogger);
if (conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations() <= 0) {
LOG.error("auditorMaxNumberOfConcurrentOpenLedgerOperations should
be greater than 0");
throw new
UnavailableException("auditorMaxNumberOfConcurrentOpenLedgerOperations should
be greater than 0");
}
- this.openLedgerNoRecoverySemaphore = new
Semaphore(conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations());
+ Semaphore openLedgerNoRecoverySemaphore =
+ new
Semaphore(conf.getAuditorMaxNumberOfConcurrentOpenLedgerOperations());
if (conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec()
< 0) {
LOG.error("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec
should be greater than or equal to 0");
throw new
UnavailableException("auditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec "
+ "should be greater than or equal to 0");
}
- this.openLedgerNoRecoverySemaphoreWaitTimeoutMSec =
+ int openLedgerNoRecoverySemaphoreWaitTimeoutMSec =
conf.getAuditorAcquireConcurrentOpenLedgerOperationsTimeoutMSec();
- this.underReplicatedLedgersGuageValue = new AtomicInteger(0);
- numUnderReplicatedLedger =
this.statsLogger.getOpStatsLogger(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS);
- underReplicatedLedgerTotalSize =
this.statsLogger.getOpStatsLogger(UNDER_REPLICATED_LEDGERS_TOTAL_SIZE);
- uRLPublishTimeForLostBookies = this.statsLogger
-
.getOpStatsLogger(ReplicationStats.URL_PUBLISH_TIME_FOR_LOST_BOOKIE);
- bookieToLedgersMapCreationTime = this.statsLogger
-
.getOpStatsLogger(ReplicationStats.BOOKIE_TO_LEDGERS_MAP_CREATION_TIME);
- checkAllLedgersTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.CHECK_ALL_LEDGERS_TIME);
- placementPolicyCheckTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
- replicasCheckTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.REPLICAS_CHECK_TIME);
- auditBookiesTime =
this.statsLogger.getOpStatsLogger(ReplicationStats.AUDIT_BOOKIES_TIME);
- numLedgersChecked =
this.statsLogger.getCounter(ReplicationStats.NUM_LEDGERS_CHECKED);
- numFragmentsPerLedger =
statsLogger.getOpStatsLogger(ReplicationStats.NUM_FRAGMENTS_PER_LEDGER);
- numBookiesPerLedger =
statsLogger.getOpStatsLogger(ReplicationStats.NUM_BOOKIES_PER_LEDGER);
- numBookieAuditsDelayed =
this.statsLogger.getCounter(ReplicationStats.NUM_BOOKIE_AUDITS_DELAYED);
- numDelayedBookieAuditsCancelled = this.statsLogger
-
.getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
- numReplicatedLedgers =
this.statsLogger.getCounter(NUM_REPLICATED_LEDGERS);
- numLedgersNotAdheringToPlacementPolicy = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return ledgersNotAdheringToPlacementPolicyGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY,
- numLedgersNotAdheringToPlacementPolicy);
- numLedgersSoftlyAdheringToPlacementPolicy = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return ledgersSoftlyAdheringToPlacementPolicyGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY,
- numLedgersSoftlyAdheringToPlacementPolicy);
-
- numUnderreplicatedLedgersElapsedRecoveryGracePeriod = new
Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return
numOfURLedgersElapsedRecoveryGracePeriodGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD,
- numUnderreplicatedLedgersElapsedRecoveryGracePeriod);
-
- numLedgersHavingNoReplicaOfAnEntry = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return numLedgersHavingNoReplicaOfAnEntryGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_NO_REPLICA_OF_AN_ENTRY,
- numLedgersHavingNoReplicaOfAnEntry);
- numLedgersHavingLessThanAQReplicasOfAnEntry = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return
numLedgersHavingLessThanAQReplicasOfAnEntryGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_AQ_REPLICAS_OF_AN_ENTRY,
- numLedgersHavingLessThanAQReplicasOfAnEntry);
- numLedgersHavingLessThanWQReplicasOfAnEntry = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return
numLedgersHavingLessThanWQReplicasOfAnEntryGuageValue.get();
- }
- };
-
this.statsLogger.registerGauge(ReplicationStats.NUM_LEDGERS_HAVING_LESS_THAN_WQ_REPLICAS_OF_AN_ENTRY,
- numLedgersHavingLessThanWQReplicasOfAnEntry);
- numUnderReplicatedLedgers = new Gauge<Integer>() {
- @Override
- public Integer getDefaultValue() {
- return 0;
- }
-
- @Override
- public Integer getSample() {
- return underReplicatedLedgersGuageValue.get();
- }
- };
- this.statsLogger.registerGauge(NUM_UNDER_REPLICATED_LEDGERS_GUAGE,
numUnderReplicatedLedgers);
-
this.bkc = bkc;
this.ownBkc = ownBkc;
this.admin = admin;
this.ownAdmin = ownAdmin;
initialize(conf, bkc);
+ AuditorTask.ShutdownTaskHandler shutdownTaskHandler =
this::submitShutdownTask;
+ AuditorTask.SubmitTaskHandler submitBookieCheckTaskHandler =
this::submitBookieCheckTask;
Review Comment:
It's only for AuditorBookieCheckTask, not define it in the AuditorTask.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]